MLOps Lab Day 2 — All Labs

Jump to a lab:

Lab 1: Project Scaffolding

S3 Layout for SageMaker Project


Objective

Build a clean, reproducible baseline you’ll reuse across labs:

  • Install and activate Python 3.9 in a virtual environment (do not use system Python 2.7).
  • Create a versioned, encrypted S3 bucket in ap-northeast-2.
  • Establish standard prefixes: data/ (immutable raw) and artifacts/ (derived outputs).
  • Record environment variables for reuse.

Why this matters

  • Separation of concerns: raw data stays immutable in data/, while Processing/Training write to artifacts/. That’s how pros keep lineage clean.
  • Governance by default: S3 versioning + server-side encryption enable rollback and auditability.
  • Reproducibility: parameterized names (prefix/account/region) prevent collisions when many students run the same labs.

Prerequisites

  • You’re on the class VM (AWS CLI installed) and can run aws sts get-caller-identity.
  • Default region: ap-northeast-2.
  • You have a key pair to SSH if needed (we’ll mostly use CLI).

Step 0 — Verify account and region

Concept: Always verify “who/where” before creating resources.

aws sts get-caller-identity
aws configure get region || true

# Run this command to set region to ap-northeast-2 if not already set
aws configure set region ap-northeast-2

Success check: You see your 12-digit Account ID; region shows ap-northeast-2.


Step 1 — Install Python 3.9 and create a venv

Concept: Amazon Linux 2 ships with Python 2.7 and 3.9;

Check Python 3.9 installation:

python3 --version
dnf info python3 | sed -n '1,12p' || true

Create and activate a project venv:

mkdir -p ~/mlops-day2
cd ~/mlops-day2
python3 -m venv .venv
source ~/mlops-day2/.venv/bin/activate
python -V

Upgrade pip and install base libs:

python -m pip install --upgrade pip wheel
python -m pip install boto3 sagemaker pandas scikit-learn numpy matplotlib jupyter ipykernel

Tip: Next time you open a shell, re-activate with:

source ~/mlops-day2/.venv/bin/activate

Sanity check: Check proper installation.

python -c "import sys; print(sys.version)"
python -c "import boto3, sagemaker, sklearn, pandas; print('OK: boto3', boto3.__version__)"

Step 2 — Choose a unique project prefix

Concept: Buckets are global; we include a short prefix so multiple students don’t collide.

export LAB_PREFIX=<stu##>     # change to your own short tag (stu07, teamA, initials, etc.)
echo $LAB_PREFIX

If you want this prefix to persist for future shells, add it to ~/.bashrc later.


Step 3 — Parameterize names (account + region + bucket)

Concept: Compose names from prefix + account + region for uniqueness and traceability.

export AWS_REGION=ap-northeast-2
export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
export BUCKET=${LAB_PREFIX}-${ACCOUNT_ID}-${AWS_REGION}-mlops
export S3_DATA=s3://${BUCKET}/data
export S3_ARTIFACTS=s3://${BUCKET}/artifacts
echo "Bucket will be: ${BUCKET}"
# Staging area for code bundles we upload for SageMaker jobs
export S3_CODE="s3://${BUCKET}/${LAB_PREFIX}/code"

# Standardized data sub-prefixes
export S3_DATA_RAW="${S3_DATA}/raw"
export S3_DATA_PROCESSED="${S3_DATA}/processed"

# Artifacts from jobs
export S3_ART_PREPROCESS="${S3_ARTIFACTS}/preprocess"
export S3_ART_TRAIN="${S3_ARTIFACTS}/training"

Check environment variables

echo $S3_CODE
echo $S3_DATA_RAW
echo $S3_DATA_PROCESSED
echo $S3_ART_PREPROCESS
echo $S3_ART_TRAIN

Step 4 — Create the S3 bucket with governance defaults

Concept: Create in-region, enable versioning, enable server-side encryption, and block public access.

Create the bucket:

aws s3api create-bucket \
  --bucket "${BUCKET}" \
  --region "${AWS_REGION}" \
  --create-bucket-configuration LocationConstraint="${AWS_REGION}"

Turn on governance:

aws s3api put-bucket-versioning \
  --bucket "${BUCKET}" \
  --versioning-configuration Status=Enabled

aws s3api put-bucket-encryption \
  --bucket "${BUCKET}" \
  --server-side-encryption-configuration '{"Rules":[{"ApplyServerSideEncryptionByDefault":{"SSEAlgorithm":"AES256"}}]}'

aws s3api put-public-access-block \
  --bucket "${BUCKET}" \
  --public-access-block-configuration BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true

See it in the console: open S3 → click your bucket → Properties → confirm Bucket versioning: Enabled and Default encryption: SSE-S3.


Step 5 — Create standard prefixes (data/ vs artifacts/)

Concept: We keep raw data immutable under data/; Processing/Training jobs will write derived outputs under artifacts/.

Create visible prefixes:

echo ok > /tmp/keep.txt

# Data layout
aws s3 cp /tmp/keep.txt "$S3_DATA_RAW/"
aws s3 cp /tmp/keep.txt "$S3_DATA_PROCESSED/"

# Artifacts layout
aws s3 cp /tmp/keep.txt "$S3_ART_PREPROCESS/"
aws s3 cp /tmp/keep.txt "$S3_ART_TRAIN/"

# Code staging for SageMaker (source_dir)
aws s3 cp /tmp/keep.txt "$S3_CODE/"

# Sanity check
aws s3 ls "$S3_DATA/" --recursive | head
aws s3 ls "$S3_ARTIFACTS/" --recursive | head
aws s3 ls "$S3_CODE/" --recursive

See it in the console: S3 → your bucket → Objects tab shows data/ and artifacts/.


Step 6 — (Optional) upload a small demo dataset

Concept: For a deterministic starting point, we’ll generate a tiny CSV. All transformations will happen later in a SageMaker Processing job (not locally), preserving lineage.

Generate + upload:

python - <<'PY'
import pandas as pd
from sklearn.datasets import load_breast_cancer
from pathlib import Path
X, y = load_breast_cancer(return_X_y=True, as_frame=True)
df = X.copy(); df['label'] = y
Path('local_data/raw').mkdir(parents=True, exist_ok=True)
df.to_csv('local_data/raw/breast_cancer.csv', index=False)
print("Local file created:", 'local_data/raw/breast_cancer.csv', "Rows:", len(df))
PY

aws s3 cp local_data/raw/ ${S3_DATA}/raw/ --recursive
aws s3 ls ${S3_DATA}/raw/

Step 7 — Save your environment for reuse

Concept: Persist the variables you’ll use across labs so every new shell is consistent.

cat > ~/mlops-env.sh <<EOF
export AWS_REGION=${AWS_REGION}
export LAB_PREFIX=${LAB_PREFIX}
export ACCOUNT_ID=${ACCOUNT_ID}
export BUCKET=${BUCKET}

export S3_DATA=${S3_DATA}
export S3_ARTIFACTS=${S3_ARTIFACTS}

# New: explicit sub-prefixes used by later labs
export S3_CODE=${S3_CODE}
export S3_DATA_RAW=${S3_DATA_RAW}
export S3_DATA_PROCESSED=${S3_DATA_PROCESSED}
export S3_ART_PREPROCESS=${S3_ART_PREPROCESS}
export S3_ART_TRAIN=${S3_ART_TRAIN}
EOF

echo "Reload with: source ~/mlops-env.sh"


# Use in any new shell:
# source ~/mlops-env.sh
  • $S3_CODEsource_dir (we upload your sagemaker/code/ here so SageMaker doesn’t fall back to the default bucket).
  • $S3_DATA_RAW → where you put raw CSVs (e.g., Telco) for Processing jobs to read.
  • $S3_DATA_PROCESSED → Processing writes train/, val/, test/ splits here.
  • $S3_ART_PREPROCESS → Processing writes preprocess.joblib, columns.json, schema.json, report.json.
  • $S3_ART_TRAIN → Training estimator output_path so model.tar.gz and metrics.json land in your bucket.
📦 S3 “code” vs Git

GitHub (your repo) is the source of truth. $S3_CODE is just a staging area SageMaker pulls from at job start. We upload there so the jobs never touch the default sagemaker-<region>-<acct> bucket and to keep all job inputs/outputs inside your bucket for clean lineage.


Comprehension checkpoints

  1. Why is data/raw/ immutable while all transformations write to artifacts/?
  1. Which S3 features did we enable for governance and why?
  1. How do our parameterized names prevent collisions in a multi-student class?

Ideal answers

  1. Determinism and auditability; the same inputs + code → comparable outputs; raw isn’t mutated.
  1. Versioning (rollback/lineage) and SSE-S3 encryption (at-rest protection); plus public access block to avoid accidental exposure.
  1. By embedding prefix + account + region, bucket names are globally unique and self-describing.

Preview of Lab 2

We’ll run a SageMaker Processing job that reads data/raw/, scales features, and writes artifacts/lab1/train and artifacts/lab1/test, then a Training job that logs metrics to Experiments—with an explanation of /opt/ml/processing/* and /opt/ml/* mount points as we use them.

↑ Back to top


Lab 2: Set Up Development Environment

VS Code + GitHub + Jupyter in VS Code


Objective

  • Connect VS Code (Remote-SSH) to the Dev Box.
  • Select the Python 3.8 venv interpreter.
  • Enable visualizations inside VS Code (Jupyter/Interactive Window).
  • Configure Git on the Dev Box and GitHub access via SSH.
  • Install and sign in to the GitHub extensions in VS Code.
  • Initialize a clean repo structure and add code-quality hooks.

Prerequisites

  • Dev Box reachable via SSH (key pair ready).
  • Lab 1 done (S3 layout + Python 3.8 venv).
  • Region is ap-northeast-2 (Seoul).

Step 1 — Install VS Code and core extensions (on your laptop)

Concept: Author remotely with a full IDE; run code on the Dev Box.

  1. Install Visual Studio Code.
  1. Open Extensions (left sidebar) and install:
    • Remote - SSH (Microsoft)
    • Python (Microsoft)
    • Jupyter (Microsoft)

Windows note: All instructions work on Windows and macOS. We’ll add Windows SSH specifics below.


Step 2 — Configure Remote-SSH (on your laptop)

Concept: Use SSH to open and work on the remote Dev Box.

  1. In EC2Instances, copy your instance’s Public IPv4 DNS.
  1. Create or edit your SSH config:

Mac/Linux → ~/.ssh/config

Windows (OpenSSH) → %USERPROFILE%\.ssh\config

Host devbox
  HostName <EC2-Public-DNS>
  User ec2-user
  IdentityFile <path-to-your-.pem>
  1. In VS Code: Command PaletteRemote-SSH: Connect to Host… → select devbox.

Success check: Status bar shows SSH: devbox.


Step 3 — Open the project and select the Python 3.9 interpreter

Concept: Always use your venv so packages are consistent.

  1. FileOpen Folder…/home/ec2-user/mlops-day2.
  1. Trust the authors, if prompted.
  1. Install Python & Jupyter extensions on the remote
    • Open Extensions (left sidebar).
    • Search Python (Microsoft). If the button says Install in SSH: devbox, click that.
    • Do the same for Jupyter.
    • If you just installed, press ⌘⇧P / Ctrl+Shift+PDeveloper: Reload Window.
  1. Open VS Code Terminal if not already open
    • ViewTerminal (or ⌃\`` on mac, Ctrl+`` on Windows/Linux).
    • If the panel is hidden, ViewAppearanceToggle Panel (or ⌘J / Ctrl+J).
    • You should now see tabs like TERMINAL / OUTPUT / PROBLEMS.
  1. Select and activate the project venv
    cd ~/mlops-day2
    source .venv/bin/activate
    python -V
  1. Tell VS Code to use this venv interpreter
    • Or ⌘⇧P / Ctrl+Shift+PPython: Select Interpreter → pick the same path.

Step 4 — Enable visualizations inside VS Code

Concept: EDA and plots can render inside VS Code (no extra ports).

Install tooling in your venv:

pip install --upgrade pip
pip install pandas matplotlib seaborn jupyter ipykernel
# optional but nice for labels:
sudo dnf -y install dejavu-sans-fonts

Option A — Native notebooks

  • FileNew File… → choose Jupyter Notebook
  • Kernel picker (top-right) → choose your .venv interpreter.
  • Test:
import pandas as pd, matplotlib.pyplot as plt
df = pd.DataFrame({"x":[1,2,3], "y":[2,4,3]})
df.plot(x="x", y="y"); plt.show()

Option B — Interactive Window from .py (DO NOT USE THIS OPTION)

  • FileNew File… → choose Python File (Text File if unavailable. Once you save the file with a .py extension, it will realize that it is a Python file

Add a cell marker and run the cell:

#%%
import pandas as pd, matplotlib.pyplot as plt
df = pd.DataFrame({"x":[1,2,3], "y":[2,4,3]})
df.plot(x="x", y="y"); plt.show()

Step 5 — Install Git and set your identity (on the Dev Box)

Concept: Commits should be authored by you; pushes use SSH.

Prerequisite: Github account. Create, if you do not have one.

sudo yum install -y git
git --version
git config --global user.name  "<Your Name>"
git config --global user.email "<you@company.com>"

Step 6 — Create an SSH key and add it to GitHub (on the Dev Box)

Concept: SSH keys avoid tokens in files and work cleanly over Remote-SSH.

  • Generate key:
    ssh-keygen -t ed25519 -C "<you@company.com>"
    # Press Enter for default path; optionally add a passphrase
    
    # if you have added a passphrase run following command to keep in memory
    eval "$(ssh-agent -s)"
    ssh-add ~/.ssh/id_ed25519
  • Copy the public key:
    cat ~/.ssh/id_ed25519.pub
  • In GitHub: SettingsSSH and GPG keysNew SSH key → paste → Add SSH key.
  • Test:
ssh -T git@github.com
# Expect: "Hi <user>! You've successfully authenticated…"

Step 7 — Install GitHub extensions in VS Code and sign in (on your laptop)

Concept: Built-in Git works already, but these extensions streamline PRs and collaboration.

  1. In VS Code Extensions, install:
    • GitHub Pull Requests (GitHub)
    • GitLens — Git supercharged (highly recommended)
    • (Optional) GitHub Actions
  1. Sign in to GitHub in VS Code:

    Click the Accounts icon (bottom-left) → Sign in to GitHub.
    (You may have to login from a new VS Code Window. Sign in may fail from the Remote-SSH window)

    Accept the browser auth prompt and return to VS Code.

  1. When connected to SSH: devbox, VS Code may prompt to “Install on remote” for some extensions—click Install in Dev Container / SSH: devbox so features work server-side, too.

Result: You can open the GitHub view to see PRs/issues, blame lines with GitLens, and watch Actions runs.


Step 8 — Initialize your Git repo and scaffold the project (in ~/mlops-day2)

Concept: One clean repo per project is standard for MLOps. We’ll turn your existing ~/mlops-day2 folder (already has the .venv) into a Git repo, add a sensible SageMaker-oriented layout, and push to GitHub over SSH.

  • In the VS Code terminal (Remote-SSH → devbox): make the repo and set identity
cd ~/mlops-day2
git init -b main
git config user.name "Your Name". # if you haven't already done this
git config user.email "you@company.com". # if you haven't already done this
  • Create the project structure (folders you’ll use across labs)
mkdir -p sagemaker/{code,pipelines,monitor} \
         scripts \
         infra/cloudformation \
         config \
         data/{raw,processed,models,monitoring} \
         tests \
         notebooks
  • Add starter files (placeholders we’ll fill in later labs)
# Processing / Training / Inference entrypoints
cat > sagemaker/code/preprocess.py <<'PY'
"""Feature preprocessing job entrypoint (SageMaker Processing)."""
import argparse
def main():
    p = argparse.ArgumentParser()
    p.add_argument("--input", required=True)
    p.add_argument("--output", required=True)
    # args = p.parse_args()
    print("Preprocess placeholder – will implement in Lab 3.")
if __name__ == "__main__":
    main()
PY

cat > sagemaker/code/train.py <<'PY'
"""Model training entrypoint (SageMaker Training)."""
import argparse
def main():
    p = argparse.ArgumentParser()
    p.add_argument("--train", required=True)
    p.add_argument("--val")
    p.add_argument("--model-dir", default="/opt/ml/model")
    p.add_argument("--output-data-dir", default="/opt/ml/output/data")
    # args = p.parse_args()
    print("Train placeholder – will implement in Lab 4.")
if __name__ == "__main__":
    main()
PY

cat > sagemaker/code/inference.py <<'PY'
"""Inference script for ScriptModel/BYOC – will implement in Lab 5."""
# define: model_fn(), input_fn(), predict_fn(), output_fn()
PY

# Pipeline scaffold
cat > sagemaker/pipelines/pipeline.py <<'PY'
"""SageMaker Pipeline scaffold – will wire Processing → Training → Register → Deploy in Lab 6."""
PY

# Monitoring placeholders
cat > sagemaker/monitor/baseline_config.json <<'JSON'
{ "todo": "Generated in monitoring lab" }
JSON
echo "Monitoring notes and runbooks will go here." > sagemaker/monitor/monitor_schedule.md

  • Add repo hygiene files
cat > .gitignore <<'GIT'
.venv/
__pycache__/
*.pyc
.ipynb_checkpoints/
.vscode/*
!.vscode/settings.json
data/
GIT

cat > README.md <<'MD'
# MLOps Day 2 Workspace

SageMaker jobs, pipelines, deployment, and monitoring artifacts for the course.

- sagemaker/code/: processing, training, inference entrypoints
- sagemaker/pipelines/: pipeline definition
- sagemaker/monitor/: baselines & schedules
- scripts/: helper CLI scripts
- infra/cloudformation/: optional infra (roles, alarms)
- data/: local staging (S3 is the source of truth)
- notebooks/: EDA
MD
  • Stage and commit your scaffold
git add .
git commit -m "Scaffold project structure for SageMaker labs"
  • Create an empty repo on GitHub (no README/license)
  • In GitHub UI: click New → name it (e.g., mlops-day2-<studentId>) → Create repository.
  • Connect the remote and push
git remote add origin git@github.com:<org-or-user>/<repo-name>.git
git push -u origin main
  • Verify
git status
git remote -v
  • In VS Code, open Source Control and GitLens to see remotes/commits.
  • The GitHub Pull Requests extension will let you raise PRs directly from the editor.

Why this layout?

It mirrors how production SageMaker projects evolve: separate entrypoints for Processing/Training/Inference, a Pipeline to orchestrate them, CI-friendly scripts/, and dedicated monitor/ for baselines/drift schedules—all versioned in one repo.

Step 9 — Add code quality hooks (pre-commit)

Concept: Production MLOps repos need repeatable, readable, and safe code. We enforce that with Git pre-commit hooks—tiny checks that run automatically every time you git commit. If a check fails, the commit is blocked until it’s fixed. This keeps the main branch clean before code ever hits CI.

What these checks do (and why it matters):

  • Formatting (e.g., Black or Ruff format) — auto-styles code so diffs only show real changes, not whitespace or quote noise. Faster reviews.
  • Linting (Ruff) — static analysis to catch bugs early: unused imports/vars, undefined names, shadowing, risky patterns, etc.
  • Import ordering (isort) — consistent import groups makes merges easier.
  • Security & hygiene — prevent accidental secrets, keys, or huge binary files from entering the repo.
  • Consistency across developers — hooks run locally on every laptop and in CI so your style and quality gates are identical.

  • Install tooling (inside your venv, at repo root ~/mlops-day2)
cd ~/mlops-day2
pip install --upgrade pip pre-commit black isort ruff bandit detect-secrets
  • Create pyproject.toml (at repo root)

    Goal: store formatting/linting settings so every tool and developer uses the same rules.

    • In VS Code: FileNew FilePlain Text → name it pyproject.toml
    • Paste the content below and save:
    # pyproject.toml
    [tool.black]
    line-length = 100
    target-version = ["py39"]
    
    [tool.isort]
    profile = "black"
    line_length = 100
    
    [tool.ruff]
    line-length = 100
    target-version = "py39"
    lint.select = ["E","F","I","B","UP","S","W"]  # PEP8/flake8, import rules, bugbear, pyupgrade, security basics
    lint.ignore = ["E203"]  # compatible with Black
    
    [tool.bandit]
    skips = ["B101"]  # example: allow assert in tests
  • Create .pre-commit-config.yaml (at repo root)

    Goal: define the checks that auto-run on each git commit.

    • In VS Code: FileNew FilePlain Text → name it .pre-commit-config.yaml
    • Paste the content below and save:
    # .pre-commit-config.yaml
    repos:
      # Basic hygiene
      - repo: https://github.com/pre-commit/pre-commit-hooks
        rev: v4.6.0
        hooks:
          - id: trailing-whitespace
          - id: end-of-file-fixer
          - id: check-merge-conflict
          - id: check-yaml
          - id: check-added-large-files    # blocks very large binaries in Git
    
      # Strip Jupyter outputs BEFORE any secret scan or lint runs
      - repo: https://github.com/kynan/nbstripout
        rev: 0.7.1
        hooks:
          - id: nbstripout
            files: \.ipynb$
    
      # Linters/formatters (Python only; Ruff owns import sorting with --fix)
      - repo: https://github.com/astral-sh/ruff-pre-commit
        rev: v0.6.9
        hooks:
          - id: ruff
            types: [python]
            args: ["--fix"]        # auto-fix (incl. import order I001)
          - id: ruff-format
            types: [python]
    
      - repo: https://github.com/psf/black
        rev: 24.8.0
        hooks:
          - id: black
            types: [python]
    
      - repo: https://github.com/PyCQA/bandit
        rev: 1.7.9
        hooks:
          - id: bandit
            args: ["-q", "-ll", "--skip", "B101"]
            types: [python]
    
      # Secrets scanning (skip .ipynb; nbstripout removes outputs anyway)
      - repo: https://github.com/Yelp/detect-secrets
        rev: v1.5.0
        hooks:
          - id: detect-secrets
            args:
              - "--baseline"
              - ".secrets.baseline"
              - "--exclude-files"
              - "\\.ipynb$"
      
  • Initialize the secrets baseline (so false-positives are tracked, not ignored silently)
detect-secrets scan > .secrets.baseline
git add .secrets.baseline
  • Activate hooks and run once over the whole repo
pre-commit install               # runs hooks automatically on each commit
pre-commit run --all-files       # first pass; will format & flag issues
If any hook fails, fix what it reports and re-run pre-commit run --all-files until clean.

Tip: Actually, our place holder code has several issues. Fix, per given instruction.

  • Commit & push the configuration (Using git from terminal)
git add pyproject.toml .pre-commit-config.yaml
git commit -m "chore: add pre-commit (format, lint, security, hygiene)"
git push
  • Commit & push the configuration (Using Git tool from VS Code)
  • (Optional) Run the same checks in GitHub Actions
    • Select Source Control
    • Enter changes message
    • Move all Changes to Staged Changes
    • Click on Commit
    • Click on Push or Synchronize

Step 10 — VS Code workspace hygiene (once per repo)

Concept (what & why)

Your pre-commit hooks (trailing whitespace, final newline, Ruff/Black) may auto-fix files during commits. If VS Code isn’t enforcing the same rules on save, you’ll hit “unstaged changes” and rollbacks. These workspace settings keep the editor in sync with the hooks.

Run in your Dev Box terminal (in the repo root):

cd ~/mlops-day2
mkdir -p .vscode

# Editor behavior that matches our pre-commit hooks
cat > .vscode/settings.json <<'JSON'
{
  "files.trimTrailingWhitespace": true,
  "files.insertFinalNewline": true,
  "editor.formatOnSave": true,
  "[python]": {
    "editor.defaultFormatter": "charliermarsh.ruff"
  }
}
JSON

# (Optional) Recommend extensions to students
cat > .vscode/extensions.json <<'JSON'
{
  "recommendations": [
    "charliermarsh.ruff",
    "ms-python.python",
    "ms-toolsai.jupyter"
  ]
}
JSON

# (Optional, editor-agnostic) Help non-VS Code editors follow the same rules
cat > .editorconfig <<'CFG'
root = true

[*]
end_of_line = lf
insert_final_newline = true
charset = utf-8
trim_trailing_whitespace = true

[*.md]
trim_trailing_whitespace = false
CFG

# Let hooks auto-fix once, then re-stage and commit
git add .vscode/settings.json .vscode/extensions.json .editorconfig
pre-commit run --all-files
git add -A
git commit -m "chore: workspace hygiene (Ruff formatOnSave, trim whitespace, final newline)"
git push

Notes

  • This is repo configuration, not a notebook cell.
  • Keep the Ruff extension installed so editor.defaultFormatter works.
  • If you see ping-pong formatting between Ruff/Black, align your pre-commit config (or keep Ruff as the on-save formatter, which matches these settings).

Step 11 — Persist your AWS/S3 environment

Concept: Reuse the same variables across shells and CI later.

If not created in Lab 1, add:

cat > ~/mlops-env.sh <<'EOF'
export AWS_REGION=ap-northeast-2
export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
export LAB_PREFIX=stu01   # change me!
export BUCKET=${LAB_PREFIX}-${ACCOUNT_ID}-${AWS_REGION}-mlops
export S3_DATA=s3://${BUCKET}/data
export S3_ARTIFACTS=s3://${BUCKET}/artifacts
EOF

Load it when needed:

source ~/mlops-env.sh

Step 11 — “Everything works” smoke test

Concept: Quick checks now save time later.

# In VS Code terminal, venv active
python -c "import boto3, sagemaker, pandas, sklearn; print('OK: libs loaded')"
aws s3 ls $S3_DATA/ || echo "Note: complete Lab 1 to create your bucket"
python - <<'PY'
import pandas as pd, matplotlib.pyplot as plt
df = pd.DataFrame({"x":[1,2,3], "y":[2,4,3]})
df.plot(x="x", y="y"); plt.savefig("quick_plot.png", dpi=120)
print("Saved quick_plot.png")
PY

Open quick_plot.png in VS Code to confirm plotting.


Comprehension checkpoints

  • Why keep AWS creds on the Dev Box (not on laptops)?
  • How do VS Code notebooks and the Interactive Window render visuals over SSH?
  • What value do the GitHub extensions add beyond built-in Git?

Ideal answers

  • Centralized, least-privilege creds reduce risk; no secrets on laptops.
  • The kernel runs on the Dev Box; results/plots are streamed back via the VS Code server over SSH.
  • PR review, issues, blame/history, and Actions status all in one place—faster team flow.

Next

With the environment locked in, we’ll move to Lab 3: run a SageMaker Processing job for EDA + cleaning (missing values, outliers, encoding plan), then a Training job with Experiments logging. We’ll tailor preprocessing to the dataset you choose (Telco Churn recommended; Adult as an optional advanced variant).

↑ Back to top


Lab 3: Interactive EDA in a VS Code Notebook

Telco Churn


Objective

Use a Jupyter Notebook (inside VS Code on your Dev Box) to explore the Telco Churn data on S3. You’ll learn what each check tells you and why it matters for downstream preprocessing, model choice, and evaluation.

You’ll practice

  • Reading directly from S3 (no local copies).
  • Understanding data types, missing values, imbalanced targets, outliers, and feature signals.
  • Writing a short, auditable EDA summary you can compare across runs.

Step 1 — Put the dataset in S3 ($S3_DATA_RAW/telco/)

Concept: In this course, S3 is the source of truth. Jobs read inputs from S3 and write versioned artifacts back to S3. You’ll download the Telco Churn CSV with curl, sanity-check it, then upload it to your own bucket under the raw data prefix ${S3_DATA_RAW} environment variable.


  • Make sure Environment variables are set
# Pre-check: load your lab env so $S3_DATA_RAW/$BUCKET exist
source ~/mlops-env.sh
echo "BUCKET:        $BUCKET"
echo "S3_DATA_RAW:   $S3_DATA_RAW"
  • Download the dataset locally (to the Dev Box)
mkdir -p ~/datasets/telco
curl -fL \
  https://raw.githubusercontent.com/IBM/telco-customer-churn-on-icp4d/master/data/Telco-Customer-Churn.csv \
  -o ~/datasets/telco/telco_churn.csv

Why this file? It’s a widely used Telco churn sample with target column Churn (Yes/No). We rename it to a simple, less typo-prone name: telco_churn.csv.

  • Quick sanity checks (optional but recommended)
wc -l  ~/datasets/telco/telco_churn.csv          # ~7044 lines (header + rows)
head -5 ~/datasets/telco/telco_churn.csv         # peek at columns
grep -c ',Yes\|,No' ~/datasets/telco/telco_churn.csv || true  # rough target count
  • Upload to your S3 bucket (using ${BUCKET})
# Upload
aws s3 cp ~/datasets/telco/telco_churn.csv \
  "$S3_DATA_RAW/telco/telco_churn.csv"
  • Verify the object is in place
# Verify
aws s3 ls "$S3_DATA_RAW/telco/"
aws s3 cp "$S3_DATA_RAW/telco/telco_churn.csv" - | head -5

Notes

  • No need to make the bucket or object public; your SageMaker jobs will access it via your IAM role.
  • Our Processing script looks for the first CSV under the input prefix, so the exact filename is not critical—standardizing to telco_churn.csv just reduces typos.

Step 2 — Create the notebook & select your venv kernel

Concept (what & why)

A kernel is the Python runtime that executes your notebook cells. We want it to use the same venv you created earlier so packages match your pipeline code.

  • In VS Code (Remote-SSH → your Dev Box), open /home/ec2-user/mlops-day2.
  • FileNew File… → choose Jupyter Notebook → save as notebooks/eda_telco.ipynb.
  • Kernel picker (top-right) → choose your venv (e.g., Python (.venv)).

If you don’t see the venv

Command PalettePython: Select Interpreter → pick .venv → re-open the notebook.

  • Quick VS Code Notebook primer:
    • Add Code or Markdown cells by hovering at the top/bottom of a cell and clicking + Code or + Markdown.
    • In a new Markdown cell, paste and run:
      # Exploratory Data Analysis
      
      ### Telco Churn
    • Run a cell with Shift+Enter or the ▶️ button on that cell.
    • For each step below, create a new cell (Code or Markdown) and run it.

Step 3 — Setup: security model, imports, config, and AWS client

Concept (what & why)

  • EDA = Exploratory Data Analysis: quick, visual checks to spot issues before modeling.
  • We’ll read from S3 (system of record) using boto3. No keys go in code — we rely on the AWS credential chain (best practice).
  • We load ~/mlops-env.sh for your bucket/prefixes to keep paths consistent with other labs.

Security model

  • On the Dev Box: boto3 finds credentials automatically (preferably via the EC2 instance profile attached to your VM, or an AWS CLI profile you configured earlier).
  • In SageMaker Processing/Training: SageMaker uses the execution role you pass to the job; your code reads/writes local /opt/ml/processing/* paths and the service syncs to S3.

3.a — Imports (put these at the very top of the notebook)

# Imports (keep in a single top cell)
import os
import io
import json
import subprocess
import shlex
from pathlib import Path

from datetime import datetime

import yaml

import boto3
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns

pd.set_option("display.max_columns", 100)
sns.set_theme()


3.b — Get project environment variables and save to global variables

Concept (what & why)

Your notebook should read environment variables that were already exported by earlier labs. We will execute mlops-env.sh inside the notebook if necessary. Alternatively, if variables are missing (e.g., brand-new kernel), you can fix them from the VS Code Terminal, then restart the notebook kernel. (In this lab, to make things easier, we simply run the script. This is not something you may normally do in a production environment)

Run this cell:

# Step 3.b — Load project environment variables (with granular prefixes)

REQUIRED_BASE = ["AWS_REGION", "ACCOUNT_ID", "LAB_PREFIX", "BUCKET", "S3_DATA", "S3_ARTIFACTS"]
OPTIONAL_FINE = ["S3_CODE", "S3_DATA_RAW", "S3_DATA_PROCESSED", "S3_ART_PREPROCESS"]

def _source_env_file(env_file="~/mlops-env.sh", wanted=(REQUIRED_BASE + OPTIONAL_FINE)) -> dict:
    path = Path(env_file).expanduser()
    if not path.exists():
        return {}
    cmd = f"bash -lc 'set -a; source {shlex.quote(str(path))} >/dev/null 2>&1; env'"
    out = subprocess.check_output(cmd, shell=True, text=True)
    seen = {}
    for line in out.splitlines():
        if "=" not in line:
            continue
        k, v = line.split("=", 1)
        if k in wanted:
            seen[k] = v
    return seen

# Import what we can from current kernel env, then from ~/mlops-env.sh if needed
missing = [k for k in REQUIRED_BASE if not os.environ.get(k)]
if missing:
    os.environ.update(_source_env_file())

# Error if base vars still missing
missing = [k for k in REQUIRED_BASE if not os.environ.get(k)]
if missing:
    raise OSError(
        "Missing environment variables: "
        + ", ".join(missing)
        + "\nFix in VS Code Terminal: source ~/mlops-env.sh, then restart the kernel and re-run."
    )

# Bind coarse vars (from earlier labs)
REGION       = os.environ["AWS_REGION"]
ACCOUNT_ID   = os.environ["ACCOUNT_ID"]
LAB_PREFIX   = os.environ["LAB_PREFIX"]
BUCKET       = os.environ["BUCKET"]
S3_DATA      = os.environ["S3_DATA"]          # s3://.../data
S3_ARTIFACTS = os.environ["S3_ARTIFACTS"]     # s3://.../artifacts

# Try to read granular vars; if absent, derive them from the coarse ones (backward compatible)
S3_CODE            = os.environ.get("S3_CODE") or f"s3://{BUCKET}/{LAB_PREFIX}/code"
S3_DATA_RAW        = os.environ.get("S3_DATA_RAW") or f"{S3_DATA.rstrip('/')}/raw"
S3_DATA_PROCESSED  = os.environ.get("S3_DATA_PROCESSED") or f"{S3_DATA.rstrip('/')}/processed"
S3_ART_PREPROCESS  = os.environ.get("S3_ART_PREPROCESS") or f"{S3_ARTIFACTS.rstrip('/')}/preprocess"

print(json.dumps({
    "REGION": REGION, "ACCOUNT_ID": ACCOUNT_ID, "LAB_PREFIX": LAB_PREFIX, "BUCKET": BUCKET,
    "S3_CODE": S3_CODE, "S3_DATA_RAW": S3_DATA_RAW, "S3_DATA_PROCESSED": S3_DATA_PROCESSED,
    "S3_ART_PREPROCESS": S3_ART_PREPROCESS
}, indent=2))

Tip if you hit the error:

Open the VS Code Terminal and run source ~/mlops-env.sh. Confirm with echo $BUCKET. Then in the notebook click KernelRestart (top-right), and re-run the cell above. Sometimes, you may need to reconnect your Remote-SSH session.

Tip: Keeping all globals here (and never modifying them later) makes the notebook deterministic and easier to grade/debug.

3.c — Create one AWS session/client and verify identity

# One session & client for the whole notebook (faster; cleaner; testable)
session = boto3.Session(region_name=REGION)
s3 = session.client("s3")

# Sanity check: who am I? (no secrets printed)
sts = session.client("sts")
who = sts.get_caller_identity()
print("Caller identity:", json.dumps(who, indent=2))

If this fails with a permissions error, your Dev Box likely lacks an instance profile or AWS CLI credentials. Fix that, then re-run.


Only if you hit a version mismatch warning later (do NOT run by default)

If you ever see a boto3/botocore dependency warning, run this once and restart the kernel:

%pip -q install --upgrade "boto3>=1.40,<1.41" "botocore>=1.40,<1.41"

Then KernelRestart Kernel and re-run Step 3 cells.


Why this structure?

  • Imports first → avoid hidden state and re-import drift.
  • Globals next → one place to see/change paths; reproducible runs.
  • One shared client → reuses HTTP connections, cleaner design; pass s3 into helpers later.
  • Credential chain → no keys in code; least-privilege via instance profile or AWS profile.

Step 4 — Locate and load the dataset from S3

Concept (what & why)

S3 is the system of record. Real datasets can be a single CSV or many CSV shards under a folder (daily appends). We’ll support both, streaming straight from S3 with boto3 (no local files), using the shared s3 client you created in Step 3.


4.a — Define the dataset prefix (under your S3_DATA_RAW)

# Where the Telco CSVs live by convention in this course:
#   s3://<bucket>/data/raw/telco/
# You can override via TELCO_PREFIX in your shell if you used a different folder.
TELCO_PREFIX = os.environ.get("TELCO_PREFIX") or f"{S3_DATA_RAW.rstrip('/')}/telco/"
print("TELCO_PREFIX:", TELCO_PREFIX)

If you uploaded somewhere else, set it once and re-run:

# Example:
# TELCO_PREFIX = f"s3://{BUCKET}/datasets/telco/"

4.b — Helpers to work with S3 URIs and list CSVs

def split_s3_uri(s3_uri: str) -> tuple[str, str]:
    """Turn 's3://bucket/prefix' -> ('bucket','prefix')."""
    if not s3_uri.startswith("s3://"):
        raise ValueError(f"Not an s3 URI: {s3_uri}")
    no_scheme = s3_uri[len("s3://") :]
    parts = no_scheme.split("/", 1)
    bucket = parts[0]
    prefix = "" if len(parts) == 1 else parts[1]
    return bucket, prefix

def list_csv_objects(s3_client, bucket: str, prefix: str) -> list[str]:
    """List .csv keys under a prefix (handles pagination)."""
    keys, token = [], None
    while True:
        kwargs = {"Bucket": bucket, "Prefix": prefix}
        if token:
            kwargs["ContinuationToken"] = token
        resp = s3_client.list_objects_v2(**kwargs)
        for obj in resp.get("Contents", []):
            key = obj["Key"]
            if key.lower().endswith(".csv"):
                keys.append(key)
        if resp.get("IsTruncated"):
            token = resp["NextContinuationToken"]
        else:
            break
    return keys

4.c — Discover files and load one or many

bucket, telco_prefix_key = split_s3_uri(TELCO_PREFIX)
csv_keys = list_csv_objects(s3, bucket=bucket, prefix=telco_prefix_key)

print(f"Found {len(csv_keys)} CSV file(s) under {TELCO_PREFIX}")
for k in csv_keys[:10]:
    print(" -", k)

if not csv_keys:
    raise FileNotFoundError(
        f"No CSVs under {TELCO_PREFIX}\n"
        "Upload your Telco CSV to this folder and re-run this cell."
    )

def read_csv_from_s3(s3_client, bucket: str, key: str, **pd_kwargs) -> pd.DataFrame:
    """
    Stream a single CSV from S3 into a pandas DataFrame.
    Tries utf-8 first, then falls back to latin-1 for quirky files.
    """
    obj = s3_client.get_object(Bucket=bucket, Key=key)
    body = obj["Body"].read()  # bytes
    try:
        return pd.read_csv(io.BytesIO(body), encoding="utf-8", **pd_kwargs)
    except UnicodeDecodeError:
        return pd.read_csv(io.BytesIO(body), encoding="latin-1", **pd_kwargs)

TARGET = "Churn"  # keep consistent across labs

if len(csv_keys) == 1:
    print("Loading single file…")
    df = read_csv_from_s3(s3, bucket, csv_keys[0], header=0)
else:
    print("Multiple files detected; loading and concatenating…")
    frames = []
    for i, key in enumerate(csv_keys, start=1):
        print(f"  [{i}/{len(csv_keys)}] {key}")
        part = read_csv_from_s3(s3, bucket, key, header=0)
        frames.append(part)
    df = pd.concat(frames, axis=0, ignore_index=True)

print(f"Loaded shape: {df.shape}")
display(df.head(3))

4.d — Quick sanity checks (shape, columns, target distribution)

print("Columns:", list(df.columns))
print("\nDtypes:\n", df.dtypes)

if TARGET in df.columns:
    print("\nTarget distribution (raw):")
    print(df[TARGET].value_counts(dropna=False))
else:
    raise KeyError(f"Expected target column '{TARGET}' not found. "
                   "Check your file(s) and header row.")

Why this design works well

  • No local copies: streams directly from S3 with your shared s3 client.
  • Single or many files: handles real-world “daily shard” layouts.
  • Deterministic: TELCO_PREFIX is the single source of truth for the dataset location.
  • Secure by default: credentials come from the AWS credential chain (instance profile / configured profile), not hard-coded keys.


Step 5 — Schema & quick type fixes

Concept (what & why)

  • Dtype (data type) determines what preprocessing we need: numeric columns can be scaled; categorical columns need encoding.
  • In the Telco dataset, TotalCharges often lands as text because a few rows are messy; we’ll coerce it to numeric (bad parses → NaN, which we’ll impute later).
  • Identifiers like customerID don’t help prediction and can cause models to “memorize”, so we drop them.
We’ll start from the df you created in Step 4, keep a copy for reference, and build a cleaned working frame.
# Keep a reference to the original dataframe from Step 4
df_raw = df.copy()

# Quick overview
print("Rows, Cols:", df_raw.shape)
display(df_raw.sample(min(5, len(df_raw)), random_state=42))
display(df_raw.dtypes)

Fix obvious types & drop identifiers

# Work on a clean copy
df = df_raw.copy()

# 1) Coerce 'TotalCharges' to numeric (bad parses become NaN)
if "TotalCharges" in df.columns:
    df["TotalCharges"] = pd.to_numeric(df["TotalCharges"], errors="coerce")

# 2) Drop identifier columns if present
for col in ["customerID", "CustomerID", "customer_id"]:
    if col in df.columns:
        df = df.drop(columns=[col])

# 3) (Optional) Trim surrounding whitespace in string columns
for c in df.select_dtypes(include="object").columns:
    df[c] = df[c].astype(str).str.strip()

# 4) Make sure the target exists and is string/categorical
TARGET = "Churn"
if TARGET not in df.columns:
    raise KeyError(f"Expected target column '{TARGET}' not found. Check your header and file(s).")

# Cast target to category (helps some downstream tools)
df[TARGET] = df[TARGET].astype("category")

# Show the result
df.info()

Sanity checks (nulls & target balance)

# Nulls per column (we'll impute later in the processing pipeline)
null_summary = df.isna().sum().sort_values(ascending=False)
display(null_summary[null_summary > 0].head(15))

# Target distribution (class imbalance is typical: more 'No' than 'Yes')
print("\nTarget distribution:")
print(df[TARGET].value_counts(dropna=False))

Why this matters

  • Getting types right now avoids silent failures later (e.g., scalers on strings).
  • Dropping IDs reduces leakage risk.
  • Making the target categorical clarifies downstream metrics and encoding behavior.
Keep going — in the next step we’ll profile missing values, outliers, and basic signal to guide the preprocessing choices.

Plain words

  • Coercion = force a string to number; bad strings become NaN.
  • We’ll impute (NaN → a sensible value) later in processing/training.

Step 6 — Target balance: how many churners?

Concept (what & why)

  • Class imbalance = one label (usually No) massively outnumbers the other (Yes).
  • Imbalance makes accuracy misleading; we’ll prefer AUC/PR and use class weights in training.
assert "Churn" in df.columns, "Expected 'Churn' column"
vc = df["Churn"].value_counts(dropna=False)
display(vc.to_frame("count").assign(share=lambda t: (t["count"]/t["count"].sum()).round(3)))

sns.barplot(x=vc.index, y=vc.values)
plt.title("Target distribution: Churn"); plt.xlabel("Churn"); plt.ylabel("Count"); plt.show()

Plain words

  • If only 25% are Yes, a silly model that predicts No always is 75% “accurate”. That’s why we won’t celebrate pure accuracy later.

Step 7 — Missingness: where & why (and what to do)

Concept (what & why)

  • We care which columns have missing values and how many (rate), not just a picture.
  • On Telco, TotalCharges is often text with blanks; when coerced to numeric, those blanks become NaN. Those rows are typically brand-new customers (tenure == 0).
  • This guides our fix: impute numeric with median and categorical with most-frequent later (fit on train only to avoid leakage).

7.a — Count and visualize only columns with nulls

null_counts = df.isna().sum().sort_values(ascending=False)
miss = (
    null_counts[null_counts > 0]
    .to_frame(name="null_count")
    .assign(null_rate=lambda t: (t["null_count"] / len(df)).round(4))
)

if miss.empty:
    print("No missing values detected.")
else:
    display(miss)

    # Bar chart (clearer than a full heatmap when few cols have NaNs)
    ax = miss.sort_values("null_count").plot(
        kind="barh", y="null_count", legend=False, figsize=(8, 4)
    )
    plt.title("Columns with missing values")
    plt.xlabel("Rows with NaN")
    plt.tight_layout()
    plt.show()

7.b — Telco-specific sanity: why is TotalCharges missing?

if "TotalCharges" in df.columns:
    n_nan = int(df["TotalCharges"].isna().sum())
    rate = n_nan / len(df)
    print(f"TotalCharges NaN rows: {n_nan} ({rate:.2%})")

    if n_nan:
        cols_to_show = [c for c in ["tenure", "MonthlyCharges", "TotalCharges"] if c in df.columns]
        display(df.loc[df["TotalCharges"].isna(), cols_to_show].head(10))

        if "tenure" in df.columns:
            vc = df.loc[df["TotalCharges"].isna(), "tenure"].value_counts().sort_index()
            print("tenure values among TotalCharges==NaN:")
            display(vc.to_frame("rows"))

(Optional) Heatmap — only if there’s enough missingness to see a pattern

if not miss.empty and miss["null_count"].sum() > 0:
    plt.figure(figsize=(8, 4))
    # Show at most 1000 rows to keep it readable
    subset = df.isna().iloc[: min(1000, len(df))]
    sns.heatmap(subset, cbar=False)
    plt.title("Missingness map (first 1000 rows)")
    plt.xlabel("Columns"); plt.ylabel("Rows (subset)")
    plt.show()

What you should observe

  • Often only TotalCharges has NaNs, and those rows have tenure == 0.
  • That means imputing TotalCharges with a median (computed on train only) is sensible; it won’t distort long-tenure customers.

What we’ll do later (Processing script)

  • Fit a SimpleImputer(strategy="median") for numeric columns and most_frequent for categoricals on the training split, then apply to val/test.
  • This prevents data leakage and gives a reproducible, auditable fix.

Plain words

  • Numeric nulls → use median (robust to outliers).
  • Categorical nulls → most frequent or an explicit Unknown.

Step 8 — Numeric distributions & outliers (IQR rule)

Concept (what & why)

  • Outliers are extreme values that can distort scaling and model fits.
  • IQR (Interquartile Range) is a simple, robust way to flag them: values below Q1−1.5×IQR or above Q3+1.5×IQR.
num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
num_cols = [c for c in num_cols if c != "Churn"]
print("Numeric columns:", num_cols)

df[num_cols].hist(figsize=(12, 8), bins=30)
plt.suptitle("Numeric distributions", y=1.02); plt.show()

def iqr_outliers(s: pd.Series) -> int:
    q1, q3 = s.quantile([0.25, 0.75])
    iqr = q3 - q1
    lo, hi = q1 - 1.5 * iqr, q3 + 1.5 * iqr
    return int(((s < lo) | (s > hi)).sum())

outlier_summary = pd.Series({c: iqr_outliers(df[c].dropna()) for c in num_cols}).sort_values(ascending=False)
display(outlier_summary.to_frame("iqr_outlier_count"))

What the four histograms are telling you

  • SeniorCitizen (top-left)

    Only two values: 0 and 1. The tall bar at 0 means most customers are not senior citizens; the shorter bar at 1 are seniors.

    • Why your outlier table shows a big number here: the IQR rule isn’t meaningful for a binary column. Treat this as a categorical/binary flag, not something to “clip.” Nothing to fix.

  • tenure (top-right)

    Values run from 0 to 72 months. You’ll typically see a spike near 0 (brand-new customers) and a spread across the rest. No true extremes.

    • Takeaway: keep as numeric; we’ll scale it. Spikes near 0 are expected and relate to the TotalCharges issue you saw earlier.

  • MonthlyCharges (bottom-left)

    Roughly 20–120 with one main hump and a mild right tail.

    • Takeaway: looks clean; scale as numeric. If you ever saw impossible negatives or >500, that would be a data error.

  • TotalCharges (bottom-right)

    Long right tail (high totals for long-tenure customers) and many values near zero (new customers).

    • Takeaway: skewed but valid. We’ll impute the few NaNs (from blank strings) and scale. For linear models a log1p transform can help; tree models don’t need it.

About the outlier table (IQR method)

  • It counts rows outside [Q1 − 1.5·IQR, Q3 + 1.5·IQR].
  • For continuous variables here, you generally see 0 flagged (good—no crazy values).
  • For binary variables like SeniorCitizen, IQR is not meaningful and can flag a lot—ignore that signal; don’t drop/clip.

What to do next (and why)

  • Do not remove rows based solely on these plots. The “extremes” you see for TotalCharges are legitimate long-tenure customers.
  • Our Processing step will:
    1. Impute numeric (median) and categorical (most_frequent) on the train split only (prevents leakage).
    1. Scale numeric columns with StandardScaler.
    1. One-hot encode categoricals.
    1. Save the fitted preprocessors so validation/test and inference get identical treatment.

Plain words

  • If a column is very skewed, consider log transform or robust scalers later.
  • We don’t drop points here; we’re learning what needs attention in preprocessing.

Step 9 — Which categories seem related to churn?

Concept (what & why)

  • For categorical features (e.g., Contract, PaymentMethod), look at churn rate by category (not just counts).
  • This hints at useful indicators once we one-hot encode.
cat_cols = df.select_dtypes(include=["object", "category", "bool"]).columns.tolist()
cat_cols = [c for c in cat_cols if c != "Churn"]

def churn_rate_by(cat_col, top=12):
    t = (df.groupby(cat_col)["Churn"]
           .value_counts(normalize=True)
           .rename("share")
           .mul(100)
           .reset_index())
    return t[t["Churn"] == "Yes"].sort_values("share", ascending=False).head(top)

for col in [c for c in ["Contract", "PaymentMethod", "InternetService"] if c in cat_cols]:
    display(churn_rate_by(col))

if "Contract" in cat_cols:
    rates = churn_rate_by("Contract", top=10)
    sns.barplot(data=rates, x="Contract", y="share")
    plt.title("Churn rate by Contract (%)")
    plt.ylabel("Churn Yes (%)"); plt.xlabel("Contract"); plt.show()

Plain words

  • If Month-to-month contracts have much higher churn, that category has real signal.

Step 10 — Quick signals: correlation & mutual information

Concept (what & why)

  • Correlation (Pearson): how two numeric variables move together (−1 to +1). We map Churn to 0/1 to get a directional numeric link.
  • Mutual information (MI): how much knowing one variable reduces uncertainty about another (works for categorical).
from sklearn.metrics import mutual_info_score

df_corr = df.copy()
df_corr["ChurnBin"] = (df_corr["Churn"].map({"Yes": 1, "No": 0})).astype("float")

num_corr = df_corr[num_cols + ["ChurnBin"]].corr()["ChurnBin"].drop("ChurnBin").sort_values(ascending=False)
display(num_corr.to_frame("pearson_corr_with_churn"))

plt.figure(figsize=(6, 8))
sns.barplot(x=num_corr.values, y=num_corr.index)
plt.title("Numeric features: correlation with Churn (Yes=1)")
plt.xlabel("Pearson r"); plt.ylabel("Feature"); plt.show()

def mi_cat(col):
    return mutual_info_score(df_corr[col].astype(str), df_corr["Churn"])
mi = pd.Series({c: mi_cat(c) for c in cat_cols}).sort_values(ascending=False)
display(mi.to_frame("mutual_info_with_churn"))

How to read these results

Pearson correlation (numeric features)

  • What it measures: the linear relationship between a numeric feature and the target.
  • Range: −1 to +1
    • +1 = perfect positive linear relation (feature ↑ → churn ↑)
    • 0 = no linear relation (could still be non-linear!)
    • −1 = perfect negative linear relation (feature ↑ → churn ↓)
  • Rule of thumb (magnitude |r|):
    • ~0.1 weak • ~0.3 moderate • ~0.5+ strong (context matters)
  • Example (Telco):
    • tenure ≈ −0.35 → longer tenure tends to reduce churn.
    • MonthlyCharges ≈ +0.19 → higher monthly cost slightly increases churn.
  • Caveats: Sensitive to outliers; ignores non-linear patterns; categorical features must be encoded to use r.

Mutual information (numeric and categorical)

  • What it measures: how much knowing a feature reduces uncertainty about the target (captures any dependence—linear or not).
  • Range: ≥ 0 (no fixed upper bound for continuous features).
    • 0 = no information.
    • Higher = more information about churn.
    • Treat MI as a ranking metric across your features rather than a value with a universal threshold.
  • Units: sklearn reports MI in nats (natural log). For a binary target, perfectly informative binary features top out at ln 2 ≈ 0.693 nats; continuous features can exceed this with certain estimators.
  • Example (Telco):
    • Contract often near the top → contract type is very informative about churn.
    • Very low MI (≈0) features (e.g., gender) are usually weak signals on their own.

What to do with this

  • Shortlist features: keep the top-ranked MI features and numerics with larger |r|; reconsider those near zero (unless needed for business reasons).
  • Watch multicollinearity: if you find pairs of numerics with |r| > 0.9, consider keeping one to simplify models like logistic regression.
  • Plan preprocessing:
    • Numeric features with skew/outliers → scaling/robust scaler; check Winsorization or log transforms.
    • Categorical features with many levels → one-hot can explode dimensions; consider target/WOE encoding for tree-free models (later lab).
  • Remember: Correlation/MI ≠ causation. Use them to guide feature engineering, not to prove why churn happens.

What to record in your EDA notes

  • 2–3 strongest and weakest signals (r/MI) and a one-line interpretation.
  • Any surprises (e.g., features you expected to matter but don’t).
  • Decisions you’ll carry into preprocessing (drop, encode, scale, transform).

Step 11 — Save a concise EDA summary (auditable)

Concept (what & why)

Notebooks are great for exploration, but teams need a small, repeatable artifact they can compare across runs and automate in CI. We’ll save a compact JSON and upload it to s3://$S3_ART_PREPROCESS/eda/….

If BUCKET / REGION aren’t set in this kernel, re-run Step 3.

If they aren’t set in your shell, open the VS Code Terminal and: source ~/mlops-env.sh, then restart the kernel.

# Step 11 — Save concise EDA summary to $S3_ART_PREPROCESS/eda/

summary = {
    "rows": int(len(df)),
    "cols": int(df.shape[1]),
    "target": "Churn",
    "target_counts": df["Churn"].value_counts(dropna=False).to_dict(),
    "null_columns": {k: int(v) for k, v in df.isna().sum().items() if v > 0},
    "top_numeric_corr": (num_corr.head(5).round(3).to_dict() if "num_corr" in globals() else {}),
    "top_categorical_mi": (mi.head(5).round(3).to_dict() if "mi" in globals() else {}),
    "generated_at": datetime.utcnow().isoformat(timespec="seconds") + "Z",
}
print(json.dumps(summary, indent=2))

stamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
local_name = f"eda_notebook_summary_{stamp}.json"
local_path = Path.cwd() / local_name
local_path.write_text(json.dumps(summary, indent=2))
print(f"Wrote: {local_path}")

# Upload to $S3_ART_PREPROCESS/eda/
eda_bucket, eda_prefix = split_s3_uri(f"{S3_ART_PREPROCESS.rstrip('/')}/eda/")
eda_key = f"{eda_prefix.rstrip('/')}/{local_name}"
s3.upload_file(str(local_path), eda_bucket, eda_key)
print("Uploaded to:", f"s3://{eda_bucket}/{eda_key}")

What you just produced

  • A timestamped file like eda_notebook_summary_20250821-0312.json
  • Stored both locally (next to your notebook) and in S3 under s3://$S3_ART_PREPROCESS/eda/
  • Contains: row/col counts, target distribution, columns with missing values, and (if you ran them) the top numeric correlations and categorical mutual information scores

If you hit issues

  • NameError: num_corr / mi → run Step 9 cells first, or ignore (the code will store {}).
  • botocore.exceptions.NoCredentialsError → your kernel doesn’t have env vars; in VS Code Terminal run source ~/mlops-env.sh, then restart kernel and re-run Step 3.

Step 12 — author a plan.yaml for Lab 4

We want to capture the decisions you just motivated in the EDA so Lab 4 can run non-interactively and reproducibly.

Concept (what & why)

We turn EDA observations into a small, explicit contract that the preprocessing job will enforce. This is how teams keep decisions reviewable (code review) and reproducible (CI/CD).

What we’re encoding

  • target column, split sizes, seed
  • imputation policy (numeric/categorical)
  • encoding policy for categoricals
  • whether to scale numerics
  • how to handle class imbalance

Code cell (creates local plan.yaml, uploads to $S3_ART_PREPROCESS/plan.yaml)

# Step 12 — Write plan.yaml (decisions derived from EDA) to $S3_ART_PREPROCESS/

plan = {
    "target": "Churn",
    "splits": {"test_size": 0.20, "val_size": 0.10, "random_state": 42, "stratify": True},
    "numeric_imputer": "median",              # robust; matches TotalCharges insights
    "categorical_imputer": "most_frequent",   # simple, effective for Telco
    "encoder": "onehot_ignore_unknown",       # avoids crashes at inference
    "scale_numeric": True,                    # StandardScaler on numeric slice
    "label_mapping": {"No": 0, "Yes": 1},
    "class_weight": "balanced",               # handle imbalance
    # Optional context for reviewers:
    "notes": {
        "why_numeric_median": "skew & outliers; median is robust",
        "why_ignore_unknown": "stability with new categories at inference",
        "eda_artifact_hint": "see $S3_ART_PREPROCESS/eda/ for report json"
    },
}

# Save alongside the notebook
plan_path = Path.cwd() / "plan.yaml"
plan_path.write_text(yaml.safe_dump(plan, sort_keys=False))
print("Local plan.yaml written:", plan_path)

# Upload to $S3_ART_PREPROCESS/plan.yaml
pbucket, pprefix = split_s3_uri(S3_ART_PREPROCESS)
plan_key = f"{pprefix.rstrip('/')}/plan.yaml"
s3.upload_file(str(plan_path), pbucket, plan_key)
print("Uploaded plan to:", f"s3://{pbucket}/{plan_key}")

Quality gate (optional)

# Quick existence checks
resp = s3.list_objects_v2(Bucket=pbucket, Prefix=pprefix.rstrip('/') + "/")
for obj in resp.get("Contents", []):
    if obj["Key"].endswith("plan.yaml") or "eda_notebook_summary" in obj["Key"]:
        print("✓", obj["Key"])

Plain words

This small file is the bridge between EDA (what we learned) and preprocessing (what we’ll do). It lets you audit/PR-review the choices separately from code.


Step 13 — Commit the notebook

Concept (what & why)

Version your EDA so teammates can review and reproduce it.

cd ~/mlops-day2
git add notebooks/eda_telco.ipynb
git add notebooks/eda_notebook_summary_*.json
git add notebooks/plan.yaml
git commit -m "EDA: Telco churn + plan.yaml for preprocessing"
git push

Tip: A nice way to allow hooks to auto-fix before committing to git is to run the
pre-commit run -—all-files command. Here is a useful sequence:

# Save all files in VS Code first.

git status                     # (optional) see what changed
git add -A                    # 1) Stage EVERYTHING

pre-commit run --all-files    # 2) Let hooks auto-fix
git add -A                    # 3) Stage the auto-fixes

# 4) Run again until it says "All checks passed"
pre-commit run --all-files
# If it reformats again, repeat: git add -A && pre-commit run --all-files

git commit -m "feat: <your message>"   # 5) Now commit (hooks will pass)
git push

Troubleshooting:

  • Still seeing “Unstaged files detected.” → You forgot git add -A before running pre-commit.
  • Hooks keep changing files every run → Keep looping git add -A && pre-commit run --all-files until you get “All checks passed.” (Usually 1–2 loops.)
  • Notebook noise → We already run nbstripout. If it modifies, just re-stage and re-run.
  • Emergency bypass so that we do not check format/lint.
    git commit -m "checkpoint" --no-verify

Comprehension checkpoints — with model answers

  • What is EDA, in plain words? Why do it before modeling?

    Answer: Quick checks to understand and sanity-check the data (types, ranges, gaps, imbalance) so the model doesn’t learn from broken inputs. It prevents wasted training cycles and misleading metrics.

  • Why convert TotalCharges to numeric with errors="coerce"?

    Answer: Bad strings become NaN, which we can impute. Without this, numeric algorithms and plots fail or behave oddly.

  • What does “class imbalance” change for us?

    Answer: We won’t trust accuracy alone; we’ll track AUC/PR, precision/recall, and we’ll use class weights so the minority (Yes) matters during training.

  • What’s an outlier and why should we care?

    Answer: An unusually extreme value. It can skew means and scaling, causing unstable models. The IQR rule gives a simple way to flag them.

  • Correlation vs. Mutual Information—what’s the difference?

    Answer: Correlation measures linear numeric relationships (−1 to +1). MI captures any dependence (useful for categorical), but doesn’t indicate direction.


Mini-glossary (plain language)

  • EDA: Exploratory Data Analysis—sanity checks + quick visuals.
  • Distribution: How values are spread (center, spread, skew).
  • Imputation: Filling in missing values (e.g., median for numbers).
  • Outlier: Value far from most others; often flagged via IQR rule.
  • Class imbalance: One label is much more common than the other.
  • Leakage: Accidentally letting test/val information influence training.
  • Correlation: Linear association between two numeric variables.
  • Mutual Information: General dependence measure (works for categoricals).

Next

You now have an interactive, visual understanding of the data that complements your automated Processing job.

In Lab 4 we’ll implement preprocessing as a SageMaker Processing job that consumes your plan.yaml and writes:
  • $S3_DATA_PROCESSED/{train,val,test}/…csv
  • $S3_ART_PREPROCESS/artifacts/{preprocess.joblib,columns.json,schema.json}
  • $S3_ART_PREPROCESS/report/report.json

↑ Back to top


Lab 4: Data Pre-processing with SageMaker Processing + Experiments

Telco Customer Churn (binary classification)


Note: This lab does not consume plan.yaml from EDA. We keep it simple and drive the job via command-line flags (target/split/seed) and your S3 env vars. You still produce artifacts (preprocess.joblib, schema.json, etc.) that training will reuse.

Why skip it here? (simplicity for the lab)

To keep this lab focused, we drive preprocessing via CLI args + env vars instead of reading a config file. That keeps the mechanics visible (you see exactly which flags control split sizes, seed, etc.) and avoids adding config-parsing code while you’re learning SageMaker Processing.

How plan.yaml fits in real projects

In production, teams usually make preprocessing config-driven:

  • Single source of truth: plan.yaml captures decisions from EDA (target, splits, imputers, encoders, scaling, label mapping, class weight policy).
  • Code reviewable & auditable: live in Git and/or versioned in S3; diffs show what changed and who approved it.
  • Reproducible runs: jobs read plan.yaml, apply it deterministically, and write the plan alongside artifacts for lineage.
  • Safer changes: you can tweak preprocessing (e.g., switch encoder, adjust split) without editing code.

Typical production pattern

  • Store plan.yaml at s3://$S3_ART_PREPROCESS/plan.yaml (also in Git).
  • The Processing script accepts an optional -plan-s3-uri and, if present, loads it and validates the fields (e.g., test_size + val_size < 1, allowed encoder names).
  • Precedence is made explicit (e.g., “plan wins over flags” or “flags override plan”), then the script proceeds with those settings and records the resolved plan in report.json.

Example (for a future iteration; not applicable in this lab):

# parse_args()
p.add_argument("--plan-s3-uri", default=None)

# after parse_args() in main()
if args.plan_s3_uri:
    import boto3, yaml
    s3 = boto3.client("s3")
    def _split(u):
        assert u.startswith("s3://"); b, k = u[5:].split("/", 1); return b, k
    b, k = _split(args.plan_s3_uri)
    body = s3.get_object(Bucket=b, Key=k)["Body"].read()
    plan = yaml.safe_load(body)

    # Example: resolve settings (plan takes precedence)
    target       = plan.get("target", args.target)
    splits       = plan.get("splits", {})
    test_size    = float(splits.get("test_size", args.test_size))
    val_size     = float(splits.get("val_size", args.val_size))
    random_state = int(splits.get("random_state", args.random_state))
    # encoder/imputer/scale flags could be read here too

    # (Optional) strict validation + echo the resolved config into report.json
else:
    target, test_size, val_size, random_state = args.target, args.test_size, args.val_size, args.random_state

Where this helps most

  • Automating retrains (pipelines/CI) without notebook edits
  • Enforcing team conventions (e.g., handle_unknown="ignore")
  • Traceable decisions: EDA → plan.yaml → Processing outputs → Training

Objective

  • Put raw data in S3 and run a SageMaker Processing job (no notebooks) that:
    • explores the data (EDA summary),
    • fixes missing values,
    • handles numeric/categorical features,
    • scales numerics and one-hot encodes categoricals,
    • addresses class imbalance (compute class weights),
    • splits into train/val/test,
    • saves a reusable scikit-learn preprocessing pipeline artifact.
  • Understand the Processing file system (/opt/ml/processing/*) and why this pattern is an industry standard.

Why this matters (concepts)

  • Processing jobs are ephemeral, versioned, and auditable; inputs/outputs are cleanly mounted under /opt/ml/processing, making runs reproducible and portable.
  • Separating prep from train lets you reuse the same pipeline for retraining and inference.
  • Experiments create a searchable lineage of code + params + metrics + artifacts.
💡 We’re using granular S3 prefixes so code, raw data, processed data, and artifacts are cleanly separated:

$S3_CODE, $S3_DATA_RAW, $S3_DATA_PROCESSED, $S3_ART_PREPROCESS.


Step 1 — Check Prerequisites

  • Lab 1 & 2 complete (you have ~/mlops-day2, venv active, and S3 env vars).

    In any new shell, load your env:

    source ~/mlops-env.sh
    echo "LAB_PREFIX:         $LAB_PREFIX"
    echo "BUCKET:             $BUCKET"
    echo "S3_CODE:            $S3_CODE"
    echo "S3_DATA_RAW:        $S3_DATA_RAW"
    echo "S3_DATA_PROCESSED:  $S3_DATA_PROCESSED"
    echo "S3_ART_PREPROCESS:  $S3_ART_PREPROCESS"
    echo "SM_ROLE_ARN:        $SM_ROLE_ARN"
    
    cd ~/mlops-day2
    source .venv/bin/activate
  • Region is ap-northeast-2 (Seoul).
  • Your account has/will create a SageMaker execution role (we’ll set it up below).

Troubleshooting

If the format/lint checkers are preventing you from committing to Git:

Quickest (one-time) bypass — use --no-verify

If a commit is blocked but you just need to move on:

git add <files>
git commit -m "checkpoint" --no-verify   # short form: -n

This skips all pre-commit hooks for that commit only. This only works from terminal, and not from the Git tool within VS Code.

# scripts/submit_batch_transform.py
"""
Run a SageMaker Batch Transform using the latest Approved model package in <LAB_PREFIX>-telco-churn.
Mirrors the working notebook flow (describe package → SKLearnModel → transformer), but:
  - uploads source_dir.tar.gz to YOUR project bucket via code_location
  - passes FEATURE_LIST_JSON and EVAL_JSON_S3 to inference.py
  - avoids dereferencing fields that can be None (no 'subscriptable' crashes)

Env expected (exported by ~/mlops-env.sh):
  AWS_REGION, LAB_PREFIX, BUCKET, S3_ARTIFACTS, SM_ROLE_ARN
"""

import logging
import os
import uuid
from datetime import datetime

import boto3

from sagemaker import Session
from sagemaker.sklearn.model import SKLearnModel

for name in ("sagemaker", "boto3", "botocore", "urllib3", "s3transfer"):
    logging.getLogger(name).setLevel(logging.WARNING)


# --- Env / clients ---
REGION = os.environ.get("AWS_REGION", "ap-northeast-2")
BUCKET = os.environ["BUCKET"]
LABP = os.environ.get("LAB_PREFIX", "student")
SM_ROLE_ARN = os.environ["SM_ROLE_ARN"]
S3_ARTIFACTS = os.environ["S3_ARTIFACTS"].rstrip("/")  # e.g., s3://<bucket>/<prefix>

boto_sess = boto3.Session(region_name=REGION)
sm = boto_sess.client("sagemaker")
s3 = boto_sess.client("s3")
sess = Session(boto_session=boto_sess)


def latest_approved_pkg_arn(group: str) -> str:
    r = sm.list_model_packages(
        ModelPackageGroupName=group,
        ModelApprovalStatus="Approved",
        SortBy="CreationTime",
        SortOrder="Descending",
        MaxResults=1,
    )
    lst = r.get("ModelPackageSummaryList", [])
    if not lst:
        raise SystemExit("No Approved model package. Approve one in Lab 7.")
    return lst[0]["ModelPackageArn"]


def describe_package_bits(pkg_arn: str):
    info = sm.describe_model_package(ModelPackageName=pkg_arn)
    c = info["InferenceSpecification"]["Containers"][0]
    image_uri = c["Image"]
    model_data_url = c["ModelDataUrl"]
    eval_json_s3 = info["ModelMetrics"]["ModelQuality"]["Statistics"]["S3Uri"]
    return image_uri, model_data_url, eval_json_s3


def find_test_input() -> str:
    cands = []
    p = s3.get_paginator("list_objects_v2")
    for prefix in ("data/processed/", "artifacts/preprocess/"):
        for page in p.paginate(Bucket=BUCKET, Prefix=prefix):
            for o in page.get("Contents", []) or []:
                if o["Key"].endswith("test.csv"):
                    cands.append((o["LastModified"], o["Key"]))
    if not cands:
        raise SystemExit("No processed test.csv found.")
    _, key = max(cands, key=lambda x: x[0])
    return f"s3://{BUCKET}/{key}"


def main():
    group = f"{LABP}-telco-churn"
    pkg_arn = latest_approved_pkg_arn(group)
    image_uri, model_data_url, eval_json_s3 = describe_package_bits(pkg_arn)

    # Contract (columns) published by preprocessing
    feature_list_s3 = f"{S3_ARTIFACTS}/preprocess/columns.json"

    # Build a Model that packages our inference.py; keep code in OUR bucket.
    sk_model = SKLearnModel(
        model_data=model_data_url,
        image_uri=image_uri,
        role=SM_ROLE_ARN,
        entry_point="inference.py",
        source_dir="sagemaker/code",
        sagemaker_session=sess,
        code_location=f"{S3_ARTIFACTS}/code",  # <-- keeps source_dir.tar.gz in project bucket
        env={
            "EVAL_JSON_S3": eval_json_s3,
            "FEATURE_LIST_JSON": feature_list_s3,
        },
    )

    # Transformer (ephemeral batch job)
    out = f"{S3_ARTIFACTS}/batch/{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}/"
    transformer = sk_model.transformer(
        instance_type="ml.m5.large",
        instance_count=1,
        strategy="MultiRecord",
        assemble_with="Line",
        output_path=out,
        max_payload=6,
        max_concurrent_transforms=2,
    )

    # Submit job (no log streaming)
    job_name = f"{LABP}-bt-{uuid.uuid4().hex[:8]}"
    input_s3 = find_test_input()

    # Start the transform but do NOT wait here (avoids streaming container logs)
    transformer.transform(
        data=input_s3,
        content_type="text/csv",
        split_type="Line",
        job_name=job_name,
        wait=False,  # <-- key: don't stream logs
    )

    print(f"Started: {job_name}")
    print(f"Input  : {input_s3}")
    print(f"Output : {out}")  # single source of truth for where predictions will land

    # Quiet poll loop (minimal status line; no container logs)
    import time

    while True:
        d = sm.describe_transform_job(TransformJobName=job_name)
        status = d.get("TransformJobStatus", "Unknown")
        print("Status:", status)
        if status in ("Completed", "Failed", "Stopped"):
            if status != "Completed":
                # print a concise failure reason if present (still quiet)
                fr = d.get("FailureReason")
                if fr:
                    print("FailureReason:", fr)
            # Optionally, confirm the exact S3 output SageMaker returns
            s3_path = (d.get("TransformOutput") or {}).get("S3OutputPath") or out
            print("Final output:", s3_path)
            break
        time.sleep(10)


if __name__ == "__main__":
    main()

Step 2 — Create (or reuse) a SageMaker execution role

Concept: SageMaker jobs assume an IAM role to access S3 and write logs/artifacts.

  • Create a role once (idempotent). For the class we’ll attach a broad policy to keep the lab simple (in real projects, scope to your bucket and specific actions).
# Run these from the terminal

ROLE_NAME="${LAB_PREFIX}-SageMakerMLOpsRole"
TRUST_JSON=/tmp/sm-trust.json
S3_INLINE_JSON=/tmp/sm-s3-bucket.json

# 1) Trust policy: let SageMaker assume this role
cat > "$TRUST_JSON" <<'JSON'
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": { "Service": "sagemaker.amazonaws.com" },
      "Action": "sts:AssumeRole"
    }
  ]
}
JSON

# 2) Create the role if it doesn't exist (idempotent) and tag it
aws iam get-role --role-name "$ROLE_NAME" >/dev/null 2>&1 || \
aws iam create-role \
  --role-name "$ROLE_NAME" \
  --assume-role-policy-document "file://$TRUST_JSON" \
  --tags Key=Owner,Value="$LAB_PREFIX" Key=Purpose,Value="mlops-course"

# 3) Attach managed policies (keep it simple for the lab)
aws iam attach-role-policy --role-name "$ROLE_NAME" \
  --policy-arn arn:aws:iam::aws:policy/AmazonSageMakerFullAccess || true
aws iam attach-role-policy --role-name "$ROLE_NAME" \
  --policy-arn arn:aws:iam::aws:policy/CloudWatchLogsFullAccess || true

# (Optional) If you plan BYOC images later, you may also need ECR access:
# aws iam attach-role-policy --role-name "$ROLE_NAME" \
#   --policy-arn arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryPowerUser || true

# 4) Add an inline policy limiting S3 access to *your* bucket only
cat > "$S3_INLINE_JSON" <<JSON
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "ListOwnBucket",
      "Effect": "Allow",
      "Action": ["s3:ListBucket"],
      "Resource": "arn:aws:s3:::${BUCKET}"
    },
    {
      "Sid": "RWObjectsInOwnBucket",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject","s3:PutObject","s3:DeleteObject",
        "s3:AbortMultipartUpload","s3:ListBucketMultipartUploads","s3:ListMultipartUploadParts"
      ],
      "Resource": "arn:aws:s3:::${BUCKET}/*"
    }
  ]
}
JSON

aws iam put-role-policy \
  --role-name "$ROLE_NAME" \
  --policy-name "${LAB_PREFIX}-S3BucketAccess" \
  --policy-document "file://$S3_INLINE_JSON"

# 5) Persist the ARN for later labs
SM_ROLE_ARN=$(aws iam get-role --role-name "$ROLE_NAME" --query 'Role.Arn' --output text)

# Update existing line if present; otherwise append
if grep -q '^export SM_ROLE_ARN=' ~/mlops-env.sh; then
  sed -i "s|^export SM_ROLE_ARN=.*|export SM_ROLE_ARN=${SM_ROLE_ARN}|" ~/mlops-env.sh
else
  echo "export SM_ROLE_ARN=${SM_ROLE_ARN}" >> ~/mlops-env.sh
fi

# Load it and show
source ~/mlops-env.sh
echo "Created/updated role:"
echo "  Name: ${ROLE_NAME}"
echo "  ARN : ${SM_ROLE_ARN}"

Note: In production, replace the wide policies with a least-privilege, bucket-scoped policy.

  • Verify that the role has been created from Amazon Console. Navigate to IAMRoles. Then search for the role you created. Open the role, and verify that you have all the necessary permission policies.

Step 3 — Implement the Processing entrypoint (sagemaker/code/preprocess.py)

3.1 Understand the Processing job file system & parse arguments

Concept: Code runs inside the Processing container. SageMaker mounts S3 inputs/outputs to well-known paths:

  • Inputs (we’ll use one named raw): /opt/ml/processing/input/
  • Outputs (we’ll define train, val, test, artifacts, report):
    • /opt/ml/processing/train/
    • /opt/ml/processing/val/
    • /opt/ml/processing/test/
    • /opt/ml/processing/artifacts/
    • /opt/ml/processing/report/

In VS Code, create sagemaker/code/preprocess.py Paste on top with:

Note: Remove our previous place holder code

import argparse
import json
import os
from pathlib import Path

import joblib
import numpy as np
import pandas as pd

import sklearn
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.utils.class_weight import compute_class_weight


def parse_args():
    p = argparse.ArgumentParser()
    p.add_argument("--target", default="Churn")
    p.add_argument("--test-size", type=float, default=0.2)
    p.add_argument("--val-size", type=float, default=0.1)
    p.add_argument("--random-state", type=int, default=42)
    return p.parse_args()


def io_paths_from_env():
    # These env vars are set automatically by SageMaker Processing. 
    # Defaults allow local dry-runs.
    input_dir = Path(os.environ.get("SM_INPUT_DIR", "/opt/ml/processing/input"))
    out_train = Path(os.environ.get("SM_OUTPUT_TRAIN_DIR", "/opt/ml/processing/train"))
    out_val   = Path(os.environ.get("SM_OUTPUT_VAL_DIR",   "/opt/ml/processing/val"))
    out_test  = Path(os.environ.get("SM_OUTPUT_TEST_DIR",  "/opt/ml/processing/test"))
    out_art   = Path(os.environ.get("SM_OUTPUT_ARTIFACT_DIR", "/opt/ml/processing/artifacts"))
    out_rep   = Path(os.environ.get("SM_OUTPUT_REPORT_DIR",   "/opt/ml/processing/report"))
    for d in [out_train, out_val, out_test, out_art, out_rep]:
        d.mkdir(parents=True, exist_ok=True)
    return input_dir, out_train, out_val, out_test, out_art, out_rep

The provided code defines two functions, parse_args and io_paths_from_env, which handle command-line argument parsing and input/output path configuration, respectively. These functions are used to create a data processing pipeline. In our lab, for a machine learning task using the Telco Customer Churn dataset, They are designed to work in a SageMaker Processing environment but also support local execution with default paths. Below is a detailed breakdown of each function:


Explainer — parse_args()

Purpose

  • Provide user-tunable knobs (target column, split sizes, random seed) without editing code.
  • Return a Namespace of parsed values you can pass around cleanly.

Arguments (with defaults)

  • -target"Churn"

    Target label in the Telco dataset ("Yes"/"No").

  • -test-size0.2

    Fraction reserved for test (20%).

  • -val-size0.1

    Fraction reserved for validation (10%).

  • -random-state42

    Seed for reproducible splits and transforms.

Logic (what happens)

  • Build an ArgumentParser.
  • Register the four flags above with types and defaults.
  • parse_args() reads flags from the command line and returns them.

Why it matters

  • Lets your pipeline be config-driven (e.g., try -test-size 0.3 without code edits).
  • Keeps experiments reproducible via -random-state.

Explainer — io_paths_from_env()

Purpose

  • Resolve where to read inputs and write outputs in a SageMaker Processing job.
  • Create output directories so later saves don’t fail.

Directories (env → default path)

  • SM_INPUT_DIR/opt/ml/processing/input

    Where your CSV(s) are mounted by Processing.

  • SM_OUTPUT_TRAIN_DIR/opt/ml/processing/train
  • SM_OUTPUT_VAL_DIR/opt/ml/processing/val
  • SM_OUTPUT_TEST_DIR/opt/ml/processing/test
  • SM_OUTPUT_ARTIFACT_DIR/opt/ml/processing/artifacts

    Saved transformers, scalers, metadata.

  • SM_OUTPUT_REPORT_DIR/opt/ml/processing/report

    EDA summaries, JSON reports.

Logic (what happens)

  • Read each env var with a local fallback so you can dry-run outside SageMaker.
  • Convert to Path objects for safer filesystem ops.
  • mkdir(parents=True, exist_ok=True) for all output dirs.

Why it matters

  • Processing jobs are ephemeral; the IO contract (/opt/ml/processing/...) is how results persist to S3.
  • Local fallbacks make the same script runnable in notebooks or unit tests.

3.2 Load the CSVs and apply Telco-specific fixes

Concept: In real data lakes, a “dataset” is often a folder of many CSVs (daily appends/partitions), not a single file. We’ll support both: read all CSVs under the mounted input path, concatenate them safely (even if columns differ slightly), then apply Telco cleanups (drop identifier, coerce numerics, trim whitespace).

In VS Code, open sagemaker/code/preprocess.py and append below your previous code:

def _csv_paths(root: Path) -> list[Path]:
    """Return all CSV-like files under root, sorted for deterministic order."""
    files = sorted([*root.rglob("*.csv"), *root.rglob("*.csv.gz")])
    if not files:
        raise FileNotFoundError(f"No CSV files found under {root}")
    return files


def load_telco_dataframe(input_dir: Path) -> pd.DataFrame:
    """Load one or many CSVs, normalize schema, and apply Telco-specific fixes."""
    paths = _csv_paths(input_dir)

    # 1) Load all parts
    frames = []
    for p in paths:
        df_part = pd.read_csv(p)
        frames.append(df_part)

    # 2) Union columns across parts, then concat (keeps missing columns as NaN)
    all_cols = sorted(set().union(*(f.columns for f in frames)))
    frames = [f.reindex(columns=all_cols) for f in frames]
    df = pd.concat(frames, ignore_index=True, sort=False)

    # 3) Telco-specific cleanups
    #    a) 'customerID' is an identifier, not a predictive feature
    if "customerID" in df.columns:
        df = df.drop(columns=["customerID"])

    #    b) 'TotalCharges' sometimes arrives as string; coerce to numeric (bad parses -> NaN)
    if "TotalCharges" in df.columns:
        df["TotalCharges"] = pd.to_numeric(df["TotalCharges"], errors="coerce")

    #    c) Trim whitespace in all object columns to avoid "Yes " vs "Yes"
    obj_cols = df.select_dtypes(include=["object"]).columns
    for col in obj_cols:
        df[col] = df[col].str.strip()

    # 4) Basic hygiene: drop fully empty rows (rare, but harmless)
    df = df.dropna(how="all")

    # 5) Sanity check: ensure target exists
    if "Churn" not in df.columns:
        raise ValueError("Expected target column 'Churn' not found in input data.")

    return df

Explainer — _csv_paths()

Purpose

  • Treat the input as a directory of CSV parts or a single file; return them in a consistent order.

Key inputs / outputs

  • Input: root directory where SageMaker mounted your raw/ data.
  • Output: list of Path objects for every .csv / .csv.gz under root.

Logic (what happens)

  • Recursive glob for .csv and .csv.gz.
  • Sort the list for deterministic processing order.
  • Fail early with a clear error if nothing is found.

Why it matters

  • Mirrors common lakehouse patterns (daily/hourly partitions).
  • Keeps pipelines robust if the data delivery changes from single-file to multi-file.

Explainer — load_telco_dataframe()

Purpose

  • Ingest all CSV parts, align their schemas, and normalize Telco-specific quirks so downstream preprocessing is stable.

Key inputs / outputs

  • Input: input_dir (mounted by Processing).
  • Output: a cleaned pd.DataFrame with consistent columns and types.

Logic (what happens)

  • Read every CSV part.
  • Compute the union of columns; reindex each part to that union so concatenation is safe (missing columns become NaN).
  • Drop the identifier customerID.
  • Coerce TotalCharges to numeric (NaN for malformed values).
  • Strip whitespace in all string columns.
  • Drop fully empty rows; assert the Churn target exists.

Why it matters

  • Real feeds evolve; columns appear/disappear. Union-reindexing avoids crashes and makes schema drift explicit (missing columns show up as NaN instead of silent misalignment).

3.3 Create a lightweight EDA summary

Concept: Even in automated pipelines, we want a human-readable snapshot of the data that’s cheap to compute and safe to store with every run. We’ll summarize nulls, cardinality, numeric outliers (IQR rule), and class balance. This artifact will later be written to report.json so you can audit data quality across runs.

In VS Code, open sagemaker/code/preprocess.py and append below your previous code:

def eda_summary(df: pd.DataFrame, target: str) -> dict:
    """
    Build a compact, serializable EDA report.

    Why this shape:
    - Small enough to store per run (e.g., JSON in S3).
    - Focused on fields you can quickly compare across runs for drift/debug.
    """
    # 1) Identify numeric vs categorical columns (keep the target out of cat list)
    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    cat_cols = [c for c in df.columns if c not in num_cols and c != target]

    # 2) Basic data quality signals
    null_counts = df.isna().sum().to_dict()              # missing values per column
    nunique     = df.nunique(dropna=True).to_dict()      # cardinality per column

    # 3) Outlier counts for numeric columns using Tukey's IQR rule
    #    (robust to non-normal data; fast to compute)
    outliers_iqr_count = {}
    for c in num_cols:
        s = df[c].dropna()
        if s.empty:
            outliers_iqr_count[c] = 0
            continue
        q1, q3 = s.quantile([0.25, 0.75])
        iqr = q3 - q1
        if not np.isfinite(iqr) or iqr == 0:
            outliers_iqr_count[c] = 0
            continue
        lower, upper = q1 - 1.5 * iqr, q3 + 1.5 * iqr
        outliers_iqr_count[c] = int(((s < lower) | (s > upper)).sum())

    # 4) Class balance for the target (keep raw values; mapping to 0/1 happens later)
    target_counts = df[target].value_counts(dropna=False).to_dict()

    # 5) Package into a JSON-friendly dict
    return {
        "rows": int(len(df)),
        "columns": df.columns.tolist(),
        "numeric_cols": num_cols,
        "categorical_cols": cat_cols,
        "null_counts": null_counts,
        "nunique": nunique,
        "outliers_iqr_count": outliers_iqr_count,
        "target": target,
        "target_counts": target_counts,
    }

Explainer — eda_summary()

Purpose

  • Produce a small JSON of the dataset’s health: missingness, uniqueness, outliers, and target balance—ideal for run-to-run comparisons and quick debugging.

Key inputs / outputs

  • Input: df (cleaned raw DataFrame), target (column name, e.g., "Churn").
  • Output: dict that can be dumped to report.json.

Logic (what happens)

  • Detect numeric vs categorical columns (excluding the target from categorical).
  • Compute null counts and cardinality per column.
  • Count numeric outliers via the IQR rule (fast, distribution-agnostic).
  • Capture target class counts to spot imbalance.

Why it matters

  • Gives you an audit trail of data quality with each pipeline run.
  • Highlights common issues (missingness, drifted categories, extreme values) before training.

3.4 Split into train/val/test with stratification

Concept: We want reproducible splits that preserve class ratios (stratification) because churn is imbalanced. We’ll split once into train and a temporary holdout, then split that holdout into val and test using the requested proportions.

In VS Code, open sagemaker/code/preprocess.py and append below your previous code:

def split_data(
    X: pd.DataFrame,
    y: pd.Series,
    test_size: float,
    val_size: float,
    random_state: int,
):
    """
    Split into train/val/test with stratification on y.

    Why two-step split?
    - First: train vs (val+test) using (test_size + val_size).
    - Second: split that holdout into val and test using the *relative* ratio.

    Notes:
    - Stratification keeps the positive/negative churn ratio similar across splits.
    - If stratification is impossible (e.g., too few samples in a class), we fall back
      to non-stratified splits with a clear warning.
    """
    if test_size < 0 or val_size < 0 or (test_size + val_size) >= 1.0:
        raise ValueError("Require 0 <= test_size, val_size and (test_size + val_size) < 1.0")

    holdout = test_size + val_size
    rel_val = val_size / holdout if holdout > 0 else 0.0

    # Helper to attempt a stratified split, then fallback if it fails
    def _safe_split(Xa, ya, **kwargs):
        try:
            return train_test_split(Xa, ya, stratify=ya, **kwargs)
        except ValueError as e:
            # Common when a class has very few rows; continue without stratification.
            print(f"[WARN] Stratified split failed: {e}. Falling back to non-stratified split.")
            return train_test_split(Xa, ya, **{k: v for k, v in kwargs.items() if k != "stratify"})

    # 1) Train vs (Val+Test)
    X_train, X_temp, y_train, y_temp = _safe_split(
        X, y, test_size=holdout, random_state=random_state
    )

    # 2) (Val+Test) -> Val vs Test
    X_val, X_test, y_val, y_test = _safe_split(
        X_temp, y_temp, test_size=(1 - rel_val), random_state=random_state
    )

    # (Optional) Align indices to simple 0..n-1 for neat downstream concatenations
    X_train = X_train.reset_index(drop=True)
    X_val   = X_val.reset_index(drop=True)
    X_test  = X_test.reset_index(drop=True)
    y_train = y_train.reset_index(drop=True)
    y_val   = y_val.reset_index(drop=True)
    y_test  = y_test.reset_index(drop=True)

    return X_train, X_val, X_test, y_train, y_val, y_test

Explainer — split_data()

Purpose

  • Produce train, validation, and test splits that keep the churn ratio consistent across sets, making metrics comparable and hyperparameter tuning reliable.

Key inputs / outputs

  • Inputs: X (features), y (labels, already mapped to "Yes"/"No" or 1/0 later), test_size, val_size, random_state.
  • Outputs: X_train, X_val, X_test, y_train, y_val, y_test.

Logic (what happens)

  • Validate that test_size + val_size < 1.
  • Split once into train vs (val+test) using the combined holdout size.
  • Split the holdout again into val and test using the relative validation fraction.
  • Try stratified splits; if a class is too rare (raises ValueError), fall back to non-stratified with a warning.

Why it matters

  • Balanced splits avoid misleading validation accuracy (e.g., predicting the dominant class).
  • Two-step splitting makes the exact val/test proportions easy to reason about and reproduce.

3.5 Build the preprocessing pipeline (impute, encode, scale)

Concept: Real-world tabular data mixes numerics (with missing values) and categoricals (strings with typos/whitespace). We’ll build a scikit-learn preprocessing pipeline that:

  • imputes numerics with median and categoricals with most-frequent,
  • one-hot encodes categoricals (robust to unseen categories),
  • standard-scales only the numeric features (helps many models).

Note: At the top of your file, ensure this extra import exists:

from sklearn.pipeline import Pipeline

In VS Code, open sagemaker/code/preprocess.py and append below your previous code:

def build_preprocessor(X: pd.DataFrame):
    """
    Create a reusable preprocessing pipeline:
      - Numeric: median impute  → (later) standard scale
      - Categorical: most-frequent impute → one-hot encode (handle_unknown='ignore')

    Returns:
      preprocessor: ColumnTransformer that imputes both blocks and encodes categoricals
      scaler: StandardScaler that we will apply only to the numeric block AFTER transform
      num_cols: list of numeric column names (to locate the numeric block)
      cat_cols: list of categorical column names
    """
    # 1) Partition feature columns by dtype
    num_cols = X.select_dtypes(include=[np.number]).columns.tolist()
    cat_cols = [c for c in X.columns if c not in num_cols]

    # 2) Define per-block pipelines
    #    Numeric block: just impute here; we will scale AFTER the whole transform
    num_pipe = Pipeline(steps=[
        ("impute", SimpleImputer(strategy="median")),
    ])

    #    Categorical block: impute then one-hot encode.
    #    - handle_unknown='ignore' keeps the pipeline stable if new categories appear at inference time
    #    - sparse_output=False yields a dense matrix (easier to save as CSV later)
    cat_pipe = Pipeline(steps=[
        ("impute", SimpleImputer(strategy="most_frequent")),
        ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
    ])

    # 3) Column-wise assembly
    #    Put numeric FIRST so we know its block is at the start of the transformed matrix.
    preprocessor = ColumnTransformer(
        transformers=[
            ("num", num_pipe, num_cols),
            ("cat", cat_pipe, cat_cols),
        ],
        remainder="drop",
        sparse_threshold=0.0,  # force dense output from the ColumnTransformer
        verbose_feature_names_out=False,  # cleaner output names if we ever inspect them
    )

    # 4) Scaler to apply only to the numeric slice after transform
    #    with_mean=False is safe when the overall matrix might be treated as sparse-like;
    #    it also avoids centering which can be unnecessary for one-hot parts.
    scaler = StandardScaler(with_mean=False)

    return preprocessor, scaler, num_cols, cat_cols


Explainer — build_preprocessor()

Purpose

  • Centralize all feature cleaning/encoding so training and inference use the same transforms.

Key inputs / outputs

  • Input: X (features-only DataFrame).
  • Output: (preprocessor, scaler, num_cols, cat_cols):
    • preprocessor: imputes numerics & categoricals and one-hot encodes categoricals.
    • scaler: applied only to the numeric slice after transformation.
    • num_cols / cat_cols: used to locate the numeric block in the transformed matrix.

Logic (what happens)

  • Identify numeric vs categorical columns from X’s dtypes.
  • Build two small pipelines:
    • NumericSimpleImputer(median)
    • CategoricalSimpleImputer(most_frequent)OneHotEncoder(handle_unknown="ignore")
  • Assemble them with ColumnTransformer, placing numeric first so we know exactly where to apply scaling later.
  • Return a StandardScaler (deferred application) so we can fit on train only and reuse on val/test.

Why it matters

  • Encodes your preprocessing contract in code (not prose), making it reproducible across jobs.
  • handle_unknown="ignore" keeps production inference resilient when new categories arrive.
  • Deferring scaling lets us fit transforms on the train split only, avoiding leakage—covered next.

3.6 Fit on train only, transform val/test, compute class weights

Concept: To avoid data leakage, we fit imputers/encoders/scalers only on the training split, then apply them to validation/test. Because churn is imbalanced, we’ll also compute class weights that training can use.

  • Avoiding Data Leakage:
    • Data leakage happens when information from the validation or test data accidentally influences the training process, making the model seem better than it is.
    • To prevent this, we fit data preprocessing tools (like imputers for missing values, encoders for categorical variables, or scalers for numeric data) only on the training data. This ensures the model learns patterns only from the training set.
    • Then, we apply these fitted tools to the validation and test sets without re-fitting them, mimicking how the model will handle new, unseen data.
  • Handling Imbalanced Churn:
    • In the Telco dataset, the Churn column (e.g., "Yes" for churn, "No" for no churn) is imbalanced, meaning there are usually more "No" cases than "Yes" cases.
    • This imbalance can make the model biased toward predicting "No" (the majority class).
    • To fix this, we compute class weights based on the training data. These weights give more importance to the minority class ("Yes") during training, so the model pays attention to both classes fairly.

Open sagemaker/code/preprocess.py and append below your previous code:

def fit_and_transform(
    preprocessor,
    scaler: StandardScaler,
    num_cols: list[str],
    X_train: pd.DataFrame,
    X_val: pd.DataFrame,
    X_test: pd.DataFrame,
):
    """
    Fit transforms on TRAIN only; apply to VAL/TEST.
    We arranged the ColumnTransformer as [numeric, categorical], so the
    first len(num_cols) columns of the transformed arrays are numeric.
    We scale ONLY that numeric slice (no scaling for one-hot features).
    """
    # 1) Fit imputers/encoders on TRAIN, then transform all splits
    Xt_train = preprocessor.fit_transform(X_train)   # shape: [n_train, n_features_after_encoding]
    Xt_val   = preprocessor.transform(X_val)
    Xt_test  = preprocessor.transform(X_test)

    # 2) Scale only the numeric slice at the front (if any numerics exist)
    n_num = len(num_cols)
    if n_num > 0:
        # Fit scaler on TRAIN numerics, then apply to VAL/TEST numerics
        Xt_train[:, :n_num] = scaler.fit_transform(Xt_train[:, :n_num])
        Xt_val[:,   :n_num] = scaler.transform(Xt_val[:,   :n_num])
        Xt_test[:,  :n_num] = scaler.transform(Xt_test[:,  :n_num])

    return Xt_train, Xt_val, Xt_test


def compute_class_weights(y_train: pd.Series) -> dict[int, float]:
    """
    Compute per-class weights for imbalanced data.
    Returns a small dict like {0: w0, 1: w1} that we can pass to training.
    """
    # Ensure labels are integers (0/1). If strings, map them here.
    if y_train.dtype == "object":
        y_train = y_train.map({"No": 0, "Yes": 1}).astype("int64")

    classes_present = np.sort(y_train.unique())
    # If a class is missing in the current training fold, default to weight 1.0 for present classes.
    if len(classes_present) < 2:
        return {int(classes_present[0]): 1.0}

    weights = compute_class_weight(
        class_weight="balanced",
        classes=classes_present,
        y=y_train.values,
    )
    return {int(c): float(w) for c, w in zip(classes_present, weights)}

Explainer — fit_and_transform()

Purpose

  • Apply exactly the same cleaning/encoding to all splits while preventing leakage by fitting on train only.

Key inputs / outputs

  • Inputs: preprocessor (impute+encode), scaler (for numerics), num_cols (to locate the numeric block), and X_train/X_val/X_test.
  • Outputs: Xt_train, Xt_val, Xt_test — dense ndarrays with all features ready for modeling.

Logic (what happens)

  • fit_transform on train; transform on val/test.
  • Numeric features are placed first by design; scale only the first len(num_cols) columns.
  • Leave one-hot columns unscaled.

Why it matters

  • Prevents the validation/test sets from “seeing” the training statistics (means, modes, encodings), keeping metrics honest and reproducible.

Explainer — compute_class_weights()

Purpose

  • Quantify how much to up/down-weight each class so the model doesn’t ignore the minority churn class.

Key inputs / outputs

  • Input: y_train (Series of labels, ideally 0/1).
  • Output: {class_label: weight} (e.g., {0: 0.57, 1: 1.88}).

Logic (what happens)

  • Ensure labels are integers; if still strings, map "No"→0, "Yes"→1.
  • If both classes present, use compute_class_weight('balanced', ...).
  • If only one class appears (rare edge case), return {that_class: 1.0} to avoid crashes.

Why it matters

  • Improves recall/precision on the minority class without needing to oversample/undersample right now.

3.7a Save splits and artifacts

Concept: Processing jobs should leave clean, reusable outputs for downstream steps. We’ll (1) save train/val/test as tidy CSVs with numeric features and a label column, (2) persist the fitted transformers (imputers, encoder, scaler) and metadata so training/inference can replay the exact preprocessing, and (3) prepare a simple report writer for the EDA summary.

In VS Code, open sagemaker/code/preprocess.py and append below your previous code:

def save_split(Xm, ym, path: Path):
    """
    Persist a split to CSV with columns: f0..fN, label.
    - Xm: 2D numpy-like array of features (after encoding/scaling)
    - ym: 1D pandas Series or array of labels (0/1)
    - path: destination CSV path
    """
    # Ensure arrays, align shapes, and build column names
    Xm = np.asarray(Xm)
    yv = np.asarray(ym).reshape(-1, 1)

    if Xm.ndim != 2:
        raise ValueError(f"Expected 2D features, got shape {Xm.shape}")
    if Xm.shape[0] != yv.shape[0]:
        raise ValueError(f"Row mismatch: X has {Xm.shape[0]} rows, y has {yv.shape[0]}")

    cols = [f"f{i}" for i in range(Xm.shape[1])] + ["label"]
    data = np.hstack([Xm, yv])

    df_out = pd.DataFrame(data, columns=cols)
    path.parent.mkdir(parents=True, exist_ok=True)
    df_out.to_csv(path, index=False)


def persist_artifacts(
    out_art: Path,
    preprocessor,
    scaler,
    num_cols: list[str],
    cat_cols: list[str],
    class_weights: dict[int, float],
    feature_count: int,
):
    """
    Save fitted preprocessing objects + metadata as a single joblib bundle.
    Also try to record feature names for debugging/inspection.
    """
    # Try to recover feature names from the fitted ColumnTransformer.
    # Falls back to generic f0..fN if unavailable.
    try:
        feature_names = list(preprocessor.get_feature_names_out())
    except Exception:
        feature_names = [f"f{i}" for i in range(feature_count)]

    bundle = {
        "preprocessor": preprocessor,  # fitted ColumnTransformer (imputers + one-hot)
        "scaler": scaler,  # fitted StandardScaler (numeric slice)
        "num_cols": list(num_cols),
        "cat_cols": list(cat_cols),
        "feature_names": feature_names,
        "feature_count": int(feature_count),
        "class_weights": class_weights,
        "version": {
            "pandas": pd.__version__,
            "numpy": np.__version__,
            "sklearn": sklearn.__version__,
        },
    }

    out_art.mkdir(parents=True, exist_ok=True)
    joblib.dump(bundle, out_art / "preprocess.joblib")


def write_report(report_dict: dict, out_rep: Path, filename: str = "report.json"):
    """
    Serialize a small, human-readable JSON report (EDA + split sizes, etc.).
    """
    out_rep.mkdir(parents=True, exist_ok=True)
    (out_rep / filename).write_text(json.dumps(report_dict, indent=2))

Explainer — save_split()

Purpose

  • Emit tidy CSVs (train.csv, val.csv, test.csv) with purely numeric features (f0..fN) and one label column—ready for fast ingestion by training jobs and easy spot checks.

Key inputs / outputs

  • Inputs: Xm (2D array), ym (1D labels), path (destination).
  • Output: CSV at the requested path; parent folders are created if missing.

Logic (what happens)

  • Validate shapes and row counts.
  • Build column names f0..fN + label.
  • Concatenate features and labels, write CSV without index.

Why it matters

  • Standardized schema across splits simplifies training scripts, debugging, and future model swaps.

Explainer — persist_artifacts()

Purpose

  • Store the fitted preprocessing stack (imputers, encoder, scaler) and metadata so training and inference can replay the exact transforms—critical for reproducibility.

Key inputs / outputs

  • Inputs: preprocessor, scaler, original num_cols/cat_cols, class_weights, and the feature_count.
  • Output: preprocess.joblib (single bundle with everything needed downstream).

Logic (what happens)

  • Attempt to extract feature_names from the fitted ColumnTransformer; otherwise fall back to f0..fN.
  • Package versions (pandas/numpy/scikit-learn) for auditability.
  • joblib.dump the bundle to the artifacts/ directory.

Why it matters

  • Guarantees the same encoding/scaling at training and serving time; avoids “works on my machine” drift.

Explainer — write_report()

Purpose

  • Persist a compact JSON report (EDA, split sizes, class weights, etc.) alongside artifacts for quick human review.

Key inputs / outputs

  • Inputs: report_dict (serializable), out_rep (path), optional filename.
  • Output: report.json written to the report/ directory.

Logic (what happens)

  • Ensure the directory exists, pretty-print JSON with indentation.

Why it matters

  • Creates a simple audit trail across runs without opening notebooks or logs.

3.7b — Write a schema for training (explicit columns + metadata)

Concept (what & why)

We just saved the transformed splits and the fitted preprocessing bundle. Now we’ll also save a tiny, human-and-machine friendly schema so training (and future readers) know exactly what’s in those CSVs.

💡 Why this matters
  • Training should never guess which column is the label (e.g., "Churn" vs "label").
  • By writing columns.json and schema.json, we make the contract explicit and stable:
    • features are f0..fN (all numeric)
    • label is the single column named label
  • Students can open these files to see the mapping and feature count at a glance.

What we’ll write next to preprocess.joblib

  • columns.json — the exact column order used in train/val/test.csv:

    {"columns": ["f0", "f1", ..., "fN", "label"]}

  • schema.json — small metadata for audits and downstream steps: raw target name (e.g., "Churn"), label column name ("label"), encoded feature count, optional feature names after encoding, and the label mapping used (e.g., {"No":0,"Yes":1}).
📎 Where this gets used

In 3.8 Orchestrate we’ll call this helper right after saving artifacts. Training will then rely on label (not "Churn") and can sanity-check feature counts with schema.json.


Paste this helper after your 3.7a code (below your save/persist helpers)

def write_schema_files(
    out_art: Path,
    feature_names_after_encoding: list[str],
    raw_target_name: str,
    label_name: str,
    label_mapping: dict,
) -> None:
    """
    Emit simple, human+machine friendly schema files next to preprocess.joblib.
    - columns.json: exact column order in the emitted CSVs (f0..fN + label)
    - schema.json: minimal metadata the trainer (and humans) can rely on
    """
    out_art.mkdir(parents=True, exist_ok=True)

    # 1) Columns file matches the CSVs we emit: f0..fN + label
    fcols = [f"f{i}" for i in range(len(feature_names_after_encoding))]
    (out_art / "columns.json").write_text(
        json.dumps({"columns": fcols + [label_name]}, indent=2)
    )

    # 2) Minimal but useful schema for audits and downstream consumers
    schema = {
        "raw_target": raw_target_name,                     # e.g., "Churn" in raw data
        "label_column": label_name,                        # "label" in our CSVs
        "feature_count": len(feature_names_after_encoding),
        "feature_names_after_encoding": feature_names_after_encoding,  # best effort
        "label_mapping": label_mapping,                    # e.g., {"No": 0, "Yes": 1}
        "contract": "features are f0..fN (float), label is int {0,1}",
    }
    (out_art / "schema.json").write_text(json.dumps(schema, indent=2))


3.8 Orchestrate preprocessing and write outputs

Concept: Wire everything together. We’ll (1) load raw CSV(s), (2) create an EDA summary, (3) extract features/labels, (4) split train/val/test with stratification, (5) fit transforms on train only, (6) transform val/test, (7) compute class weights, and (8) save splits, artifacts, and a report to the Processing output directories.

In VS Code, open sagemaker/code/preprocess.py and append below your previous code:

def extract_features_and_label(df: pd.DataFrame, target: str):
    """
    Split the DataFrame into features X and binary label y (0/1).

    - Accepts common string labels ('Yes'/'No', 'True'/'False', '1'/'0') case-insensitively.
    - Drops rows where the target is missing or un-mappable.
    - Returns (X, y) where y is int64 {0,1}.
    """
    if target not in df.columns:
        raise ValueError(f"Target column '{target}' not found in data")

    # Map common textual labels to integers
    if df[target].dtype == "object":
        mapping = {
            "no": 0, "yes": 1,
            "false": 0, "true": 1,
            "0": 0, "1": 1
        }
        y = df[target].astype(str).str.lower().map(mapping)
    elif df[target].dtype == bool:
        y = df[target].astype("int64")
    else:
        # numeric-ish: coerce and check it is binary
        y = pd.to_numeric(df[target], errors="coerce")
        unique = set(pd.unique(y.dropna()))
        if not unique.issubset({0, 1}):
            raise ValueError(
                f"Target '{target}' must be binary (0/1 or Yes/No). "
                f"Found values: {sorted(list(unique))[:10]}"
            )

    # Drop rows where target is NaN after mapping/coercion
    mask = y.notna()
    dropped = int((~mask).sum())
    if dropped > 0:
        print(f"[WARN] Dropping {dropped} rows with invalid/missing target labels")
    y = y.loc[mask].astype("int64")
    X = df.loc[mask].drop(columns=[target])

    return X, y


def main():
    # 0) Parse args and resolve I/O paths
    args = parse_args()
    input_dir, out_train, out_val, out_test, out_art, out_rep = io_paths_from_env()

    # 1) Load raw data (supports many CSV parts) and apply Telco fixes
    df_raw = load_telco_dataframe(input_dir)

    # 2) Lightweight EDA summary (human review)
    report = eda_summary(df_raw, args.target)

    # 3) Extract features (X) and label (y as 0/1)
    X_all, y_all = extract_features_and_label(df_raw, args.target)

    # 4) Build preprocessing stack (impute/encode + later scaling for numerics)
    preprocessor, scaler, num_cols, cat_cols = build_preprocessor(X_all)

    # 5) Train/Val/Test split (stratified), then fit on TRAIN only
    X_tr, X_va, X_te, y_tr, y_va, y_te = split_data(
        X_all,
        y_all,
        test_size=args.test_size,
        val_size=args.val_size,
        random_state=args.random_state,
    )
    Xt_tr, Xt_va, Xt_te = fit_and_transform(preprocessor, scaler, num_cols, X_tr, X_va, X_te)

    # 6) Handle class imbalance (weights from TRAIN)
    class_w = compute_class_weights(y_tr)
    feature_count = int(Xt_tr.shape[1])
    
    # 7) Recover human-readable feature names if the transformer provides them
    try:
        feat_names = list(preprocessor.get_feature_names_out())
    except Exception:
        feat_names = [f"f{i}" for i in range(feature_count)]

    # 8) Save splits (CSV) and artifacts (joblib bundle)
    save_split(Xt_tr, y_tr, out_train / "train.csv")
    save_split(Xt_va, y_va, out_val / "val.csv")
    save_split(Xt_te, y_te, out_test / "test.csv")
    persist_artifacts(
        out_art=out_art,
        preprocessor=preprocessor,
        scaler=scaler,
        num_cols=num_cols,
        cat_cols=cat_cols,
        class_weights=class_w,
        feature_count=feature_count,
    )
    
    # 9) write explicit schema files the trainer can rely on
    # We used extract_features_and_label() to map "Yes/No" → 1/0.
    # Recreate that mapping for the report/schema (students can see it).
    label_mapping = {"No": 0, "Yes": 1}
    write_schema_files(
        out_art=out_art,
        feature_names_after_encoding=feat_names,
        raw_target_name=args.target,   # e.g., "Churn" in raw data
        label_name="label",            # standardized output label name
        label_mapping=label_mapping,
    )


    # 10) Finalize and write report.json
    report.update({
        "splits": {
            "train_rows": int(len(y_tr)),
            "val_rows":   int(len(y_va)),
            "test_rows":  int(len(y_te)),
        },
        "class_weights": class_w,
        "feature_count_after_encoding": feature_count,
        "params": {
            "target": args.target,
            "test_size": args.test_size,
            "val_size": args.val_size,
            "random_state": args.random_state,
        },
    })
    write_report(report, out_rep)

    # 11) Friendly console summary
    print("=== Processing complete ===")
    print(f"Train: {out_train / 'train.csv'}")
    print(f"Val:   {out_val / 'val.csv'}")
    print(f"Test:  {out_test / 'test.csv'}")
    print(f"Artifacts: {out_art / 'preprocess.joblib'}")
    print(f"Report:    {out_rep / 'report.json'}")


if __name__ == "__main__":
    main()

Explainer — extract_features_and_label()

Purpose

  • Convert the raw target column (e.g., "Yes"/"No") into a clean binary label (0/1), and separate it from features.

Key inputs / outputs

  • Input: df (cleaned raw), target (e.g., "Churn").
  • Output: (X, y) where X excludes the target column and y is integer labels {0,1}.

Logic (what happens)

  • Accept common text or boolean forms for binary labels, case-insensitive.
  • Coerce or map to integers; drop rows with invalid/missing labels.
  • Return features without the target.

Why it matters

  • Models expect numeric labels; ambiguous string labels are a common source of bugs. This makes the contract explicit and robust.

Explainer — main()

Purpose

  • Run the full preprocessing pipeline end-to-end and materialize outputs for the next lab steps (training, evaluation, and pipelines).

What it produces

  • train.csv, val.csv, test.csv (numeric features f0..fN + label)
  • preprocess.joblib (fitted encoders/imputers/scaler + metadata)
  • report.json (EDA + split sizes + class weights + parameters)

Why it matters

  • Encapsulates repeatable preprocessing that can run locally or inside SageMaker Processing with the exact same code.

3.9 Save, run the quality checks, commit, and push

Concept: We version everything. When you commit, our pre-commit hooks run formatters/linters/security checks to keep the repo clean. Some hooks auto-fix files (e.g., reformat), others fail and ask you to fix code (e.g., unused imports).

Save your work

  • In VS Code, press Cmd+S / Ctrl+S to save sagemaker/code/preprocess.py.

Terminal workflow (recommended)

  1. See what changed
    cd ~/mlops-day2
    git status
  1. Stage your changes
    git add sagemaker/code/preprocess.py
  1. Run all hooks before committing (fast feedback)
    pre-commit run --all-files
    • If it says it reformatted files (e.g., Black, ruff-format, isort), run it again until you get “All checks passed”.
    • If it reports errors (e.g., Ruff lint, Bandit security), read the messages and fix (see “Common failures” below).
  1. Commit (hooks will run again automatically)
    git commit -m "feat(processing): implement preprocessing pipeline and EDA"
    • If a hook modifies files during the commit, Git will abort the commit. Re-stage and commit again:
      git add -A
      git commit -m "style: apply auto-fixes from hooks"
  1. Push to GitHub
    git push

Doing it from VS Code UI (alternative)

  • Open Source Control (branch icon in the left sidebar).
  • Click the + next to your changed file to Stage it.
  • Enter a message (e.g., feat: add preprocessing script) and click Commit.
  • If hooks reformat during commit, the commit may be cancelled and files will show new changes; stage again and commit once more.
  • Click Sync / Push to send changes to GitHub.
  • Use GitLens to view the new commit in the repo history.

Common failures and how to fix them

  • Black / ruff-format / isort changed files
    • What you’ll see: “reformatted X files”, “Fixing …”.
    • What to do: Nothing else; run pre-commit run --all-files again, then re-stage and re-commit.
  • Ruff (lint) errors

    Typical messages and fixes:

    • F401 imported but unused → remove the unused import.
    • F841 local variable is assigned to but never used → remove the variable or use it.
    • E999 / syntax → fix typos, commas, mismatched parentheses.
    • S3xx / security → avoid dangerous calls or handle inputs safely.

      After edits: git add -A && pre-commit run --all-files.

  • Bandit (security) warnings
    • Read the finding; prefer fixing (e.g., avoid eval, sanitize inputs).
    • For this course config we already skipped a noisy rule (B101 assert).
    • Only if you’ve assessed a finding and it’s benign, you can mark a line with # nosec (use sparingly).
  • detect-secrets
    • If it flags something, remove the secret from code/config. Store secrets in AWS (e.g., Secrets Manager) or environment variables.
    • Update the baseline after cleaning:
      detect-secrets scan > .secrets.baseline
      git add .secrets.baseline
    • Re-run hooks and commit.
  • check-added-large-files
    • Don’t commit big binaries/datasets. Put them in S3. Add paths to .gitignore if needed.

Sanity check

git log --oneline -n 3
pre-commit run --all-files
  • You should see your commit(s) and “All checks passed”.

Why we do this

  • Ensures consistent style and clean diffs across the team.
  • Catches common bugs and risky patterns before code reaches CI.
  • Keeps secrets and giant files out of the repo.

Recap — What you learned in this step

  • Why SageMaker Processing mounts standard paths like /opt/ml/processing/* to keep input/output contracts clean and portable.
  • How to create a small, auditable EDA snapshot (nulls, uniques, outliers, class balance) that travels with each run.
  • How to do a proper train / val / test split with stratification so class ratios stay consistent.
  • Why we fit imputers/encoders/scalers only on the training split to prevent data leakage, then reuse them for val/test.
  • How to persist reusable artifacts (encoders, scaler, feature names, class weights) so training and inference replay the exact same preprocessing.
  • How to safely read a dataset delivered as many CSV files (union of columns, stable ordering, schema robustness).

What this step produced

  • report.json — compact EDA + split sizes + parameters (easy to diff across runs).
  • preprocess.joblib — fitted ColumnTransformer, StandardScaler, feature names/count, and class weights for reuse.
  • train.csv, val.csv, test.csv — dense numeric features f0..fN plus a single label column.

Common pitfalls (and how we guarded against them)

  • Mixed or messy types (e.g., "TotalCharges" as string): coerced to numeric with safe NaN handling.
  • Whitespace in categories (e.g., "Yes " vs "Yes"): trimmed all object columns.
  • Schema drift across many files: unioned columns and reindexed parts before concatenation.
  • Unseen categories at inference: OneHotEncoder(handle_unknown="ignore").
  • Imbalanced target: computed class weights from train to inform the model.
  • Leakage: fit transforms on train only, then transform val/test.

If something looks off, quick checks

  • Open report.json and confirm rows, numeric_cols, categorical_cols, and target_counts look reasonable.
  • Spot-check the first few lines of train.csv in VS Code (features should be all numbers, last column label is 0/1).
  • If feature_count_after_encoding seems too small/large, verify cat_cols and num_cols in your code and check for unexpected empty columns in the raw data.

Step 4 — Submit the Processing job from your Dev Box

Concept: A SageMaker Processing job spins up a short-lived container, mounts S3 folders into the container’s filesystem, runs your Python script, then copies the results back to S3. We don’t keep servers running; each job is a one-off “batch” run.

Terminology you’ll see in code:

  • Processor / SKLearnProcessor — helper that picks a container image and instance type to run your script.
  • ProcessingInput / ProcessingOutput — wiring between S3 paths and container paths (e.g., S3 raw/…/opt/ml/processing/input/).
  • role (SM_ROLE_ARN) — an IAM role SageMaker assumes to read/write S3 and run jobs.
  • experiment / trial — (optional) “folders” for grouping jobs and comparing runs.

We will submit sagemaker/code/preprocess.py as a Processing job and write outputs (train/val/test, preprocess.joblib, report.json) back to your bucket.


4.1 Create a small driver script (submitter)

Mirror local code folder to your S3 code prefix

# Mirror your local code folder to your S3 code prefix
aws s3 sync sagemaker/code "$S3_CODE/" --delete
aws s3 ls "$S3_CODE/preprocess.py"

In VS Code: FileNew File → save as scripts/submit_processing.py, then paste this first block.

# scripts/submit_processing.py
import os
from datetime import datetime

import boto3

import sagemaker
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor

# --- 1) Read environment (set in earlier labs) ---
REGION = os.environ.get("AWS_REGION", "ap-northeast-2")
BUCKET = os.environ["BUCKET"]            # e.g., stuxx-<acct>-ap-northeast-2-mlops
ROLE    = os.environ["SM_ROLE_ARN"]      # your per-student SageMaker role
LABP    = os.environ.get("LAB_PREFIX", "stuXX")

S3_CODE            = os.environ["S3_CODE"]
S3_DATA_RAW        = os.environ["S3_DATA_RAW"]
S3_DATA_PROCESSED  = os.environ["S3_DATA_PROCESSED"]
S3_ART_PREPROCESS  = os.environ["S3_ART_PREPROCESS"]

# Optional: these control split behavior for the script
TARGET = os.environ.get("TARGET_COL", "Churn")
TEST_P = float(os.environ.get("TEST_SIZE", "0.20"))
VAL_P  = float(os.environ.get("VAL_SIZE", "0.10"))
SEED   = int(os.environ.get("RANDOM_STATE", "42"))

# --- 2) Set up a SageMaker session bound to your region ---
boto_sess = boto3.Session(region_name=REGION)
sm_sess   = sagemaker.Session(boto_session=boto_sess)

Why this block exists

  • We pull your bucket/role/region from environment so every student runs in their own sandbox.
  • We create a SageMaker session (region-aware) to submit the job.

Now add the processor (container + instance config). Paste below the previous code:

# --- 3) Choose a container and instance type for Processing ---
# We use SKLearnProcessor because our script imports scikit-learn/pandas/numpy.
# If you ever hit an image-version error in your region, try a nearby version (e.g., "1.3-1").
processor = SKLearnProcessor(
    framework_version="1.2-1",     # SageMaker-provided sklearn image (includes numpy/pandas)
    role=ROLE,
    instance_type="ml.m5.large",   # balanced CPU for preprocessing; change if needed
    instance_count=1,
    base_job_name=f"{LABP}-preprocess",
    sagemaker_session=sm_sess,
)

Notes

  • ml.m5.large is a good default for this dataset. If you’re cost-sensitive, ml.m5.xlarge (faster) or ml.t3.medium (cheaper/slower) are alternatives.
  • framework_version picks a managed scikit-learn image so you don’t build anything.

Add inputs/outputs wiring. Paste below:

# --- 4) Wire S3 <-> container paths ---
# S3 (left) will be mounted inside the container (right).
# Our script defaults to these /opt/ml/processing/* locations.

inputs = [
    ProcessingInput(
        source=f"{os.environ['S3_DATA_RAW']}/telco/",
        destination="/opt/ml/processing/input",
    ),
]

outputs = [
    ProcessingOutput(
        output_name="train",
        source="/opt/ml/processing/train",
        destination=f"{S3_DATA_PROCESSED}/train/",
    ),
    ProcessingOutput(
        output_name="val",
        source="/opt/ml/processing/val",
        destination=f"{S3_DATA_PROCESSED}/val/",
    ),
    ProcessingOutput(
        output_name="test",
        source="/opt/ml/processing/test",
        destination=f"{S3_DATA_PROCESSED}/test/",
    ),
    ProcessingOutput(  # fitted transformers + schema/columns
        output_name="artifacts",
        source="/opt/ml/processing/artifacts",
        destination=f"{S3_ART_PREPROCESS}/artifacts/",
    ),
    ProcessingOutput(  # EDA report
        output_name="report",
        source="/opt/ml/processing/report",
        destination=f"{S3_ART_PREPROCESS}/report/",
    ),
]

Why these paths

  • They match what your script expects (io_paths_from_env() uses these defaults).
  • You’ll find your outputs later under …/data/processed/... and …/artifacts/preprocess/ in your bucket.

Add the job kick-off. Paste below:

# --- 5) Kick off the job ---
job_name = f"{LABP}-preprocess-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"

print(f"Submitting Processing job: {job_name}")
processor.run(
    job_name=job_name,
    code=f"{S3_CODE.rstrip('/')}/preprocess.py",  # <-- S3 URI to the script
    inputs=inputs,
    outputs=outputs,
    arguments=[
        "--target",
        TARGET,
        "--test-size",
        str(TEST_P),
        "--val-size",
        str(VAL_P),
        "--random-state",
        str(SEED),
    ],
    wait=True,  # stream logs until the job finishes
    logs=True,  # show CloudWatch logs in your terminal
)
print("Processing job finished.")

What happens when you run this

  • SageMaker launches a container from the managed scikit-learn image.
  • Your script runs inside the container; the CSVs in S3 are visible at /opt/ml/processing/input/.
  • When it finishes, the container’s output folders are synced back to your S3 prefixes.

4.2 Run it

Open a terminal in VS Code (Remote-SSH → your Dev Box):

cd ~/mlops-day2
source .venv/bin/activate
python scripts/submit_processing.py

Success signals you’ll see

  • Logs showing your script’s print() messages (e.g., “Processing complete”).
  • A final line that the job finished Completed.
  • In S3 you should now have:
    • $S3_DATA_PROCESSED/{train,val,test}/…
    • $S3_ART_PREPROCESS/artifacts/preprocess.joblib
    • $S3_ART_PREPROCESS/report/report.json

Quick verification:

aws s3 ls "$S3_DATA_PROCESSED/train/" --recursive | head
aws s3 ls "$S3_DATA_PROCESSED/val/"   --recursive | head
aws s3 ls "$S3_DATA_PROCESSED/test/"  --recursive | head

aws s3 ls "$S3_ART_PREPROCESS/artifacts/" --recursive
aws s3 ls "$S3_ART_PREPROCESS/report/"    --recursive

4.3a (Optional) Find the job in the console

Open Amazon SageMakerProcessing jobs and look for a name like

stu01-preprocess-YYYYMMDD-HHMMSS. Click it to see inputs/outputs and CloudWatch logs.

4.3b (Optional) View the artifacts created in your S3 bucket


4.4 Common issues (and fixes)

  • AccessDenied to S3 or SageMaker
    • Your role is missing permissions. Confirm you exported SM_ROLE_ARN (per-student role) and it has AmazonSageMakerFullAccess + has the bucket-scoped inline S3 policy we added in Step 2 (List on the bucket, Get/Put/Delete on ${BUCKET}/*).
  • Image not found / framework_version error
    • Change framework_version to a nearby value (e.g., "1.3-1"), save, and re-run.
  • Wrong S3 prefix (no such key)
    • Make sure Step 1 put the dataset in $S3_DATA_RAW/telco/ and the file(s) are visible via aws s3 ls.
  • Validation errors in your script
    • Open CloudWatch logs from the job page; the stack trace will point to the exact line. Most common are path typos or unexpected column names.

4.5 Commit and push the submitter (keep the repo clean)

git add scripts/submit_processing.py
pre-commit run --all-files
git commit -m "feat(processing): add submitter for SageMaker Processing job"
git push

If a hook reformats files, stage and commit again:

git add -A
git commit -m "style: apply pre-commit auto-fixes"
git push

You just: Submitted your first SageMaker Processing job, streamed logs from your Dev Box, and materialized clean splits, artifacts, and an EDA report to S3. These are the inputs for the next lab (training with SageMaker + Experiments).


Step 5 — Inspect outputs (S3 + local)

Concept: Treat S3 outputs as immutable artifacts tied to the job run.

# Quick checks (S3 → local/stdout)

# Artifacts bundle + schema
aws s3 ls "$S3_ART_PREPROCESS/artifacts/" --recursive | grep -E 'preprocess\.joblib|columns\.json|schema\.json' || true

# EDA report
aws s3 cp "$S3_ART_PREPROCESS/report/report.json" - | jq .

# Spot-check the processed splits (avoid broken pipe warnings)
aws s3 cp "$S3_DATA_PROCESSED/train/train.csv" /tmp/train.csv
head -n 3 /tmp/train.csv

Troubleshooting

  • “My code ran from the default sagemaker- bucket.”

    You likely set source_dir to a local path. Either run

    aws s3 sync sagemaker/code "$S3_CODE/" and set source_dir=os.environ["S3_CODE"],

    or keep source_dir local but accept that SageMaker will stage it in the default bucket.

  • “report.json isn’t under artifacts/”

    With granular prefixes, report.json is under $S3_ART_PREPROCESS/report/ by design.


IAM hint

In this lab, we have scoped the roles very liberally, If scoping the role strictly, we can use these action→resource patterns now that prefixes are explicit:

  • Read raw: arn:aws:s3:::${BUCKET}/data/raw/*
  • Write processed: arn:aws:s3:::${BUCKET}/data/processed/*
  • RW preprocess artifacts/report: arn:aws:s3:::${BUCKET}/artifacts/preprocess/*
  • Read code: arn:aws:s3:::${BUCKET}/${LAB_PREFIX}/code/* (if you set S3_CODE that way)

What was produced

  • $S3_DATA_PROCESSED/{train,val,test}/…csv
  • $S3_ART_PREPROCESS/artifacts/preprocess.joblib
  • $S3_ART_PREPROCESS/artifacts/{columns.json,schema.json}
  • $S3_ART_PREPROCESS/report/report.json

Comprehension checkpoints — model answers

  • Where are Processing inputs and outputs mounted, and why is that helpful?

    Answer: Inputs and outputs are mounted under /opt/ml/processing/* inside the container:

    • Input(s): /opt/ml/processing/input/ (we used raw/ there)
    • Outputs: /opt/ml/processing/train/, /val/, /test/, /artifacts/, /report/

      This fixed layout gives a deterministic I/O contract: your script doesn’t need to know S3 paths. SageMaker automatically syncs these folders to/from S3, making runs portable, auditable, and easy to automate.

  • Why do we fit the preprocessing pipeline only on the train split?

    Answer: To prevent data leakage. Fitting imputers/encoders/scalers on validation or test would leak information about those sets into training, inflating performance. We fit on train, then transform val/test with the already-fit objects.

  • What problem do class weights address?

    Answer: Class imbalance (e.g., far more No than Yes churn). Class weights up-weight the minority class so the loss function doesn’t ignore it, improving recall/precision for the rare class.

  • What objects must we persist to reproduce the exact feature pipeline in training & inference?

    Answer:

    • The fitted preprocessing pipeline (e.g., ColumnTransformer + scaler/encoders) — we save this as preprocess.joblib.
    • A schema/metadata snapshot (feature names/types, target name, any label mapping) — e.g., schema.json.
    • Class weights (if used) — e.g., class_weights.json.
    • Optional but useful: the EDA/report (report.json) and the split datasets (train/val/test.csv) for traceability.

Next

In Lab 5, Train and track a baseline churn model with SageMaker + Experiments, consuming the processed splits and preprocess.joblib.

↑ Back to top


Lab 5: Train a Baseline Model

Logistic Regression


In Lab 4, you cleaned and split data and saved a fitted preprocessing pipeline (preprocess.joblib) plus optional class weights. In this lab you’ll:

  1. Load those artifacts,
  1. Use the already processed features from Lab 4 (numeric f0..fN + label) and keep class-imbalance handling consistent via class weights.
  1. Train a simple baseline model (Logistic Regression),
  1. Evaluate it and save metrics + the trained model.

Why Logistic Regression? It’s a solid “first model” for a binary target (churn Yes/No). It’s fast, interpretable, and gives you a good baseline to beat later.

Objective

  • Train a first-pass model on the processed Telco data.
  • Use the preprocessing artifacts you generated in Lab 4 (one-hot encoder, scaler, etc.).
  • Save a model artifact and metrics to S3 via SageMaker Training.
  • We’re proving the pipeline works end-to-end before we try fancier models.

Prereqs

  • In Lab 4, you cleaned and split data and saved a fitted preprocessing pipeline (preprocess.joblib) plus class weights.
    • $S3_DATA_PROCESSED/{train,val,test}/…
    • $S3_ART_PREPROCESS/artifacts/{preprocess.joblib,columns.json,schema.json}
  • Your SageMaker execution role is set in SM_ROLE_ARN (from Lab 4).
  • You’re in the same repo as before (~/mlops-day2).

Step 1 — Create the training script skeleton

Concept (what & why)

  • Your training code runs inside a container on a compute instance that SageMaker spins up for you.
  • The S3 folders you pass as training “channels” are mounted inside that container and exposed via environment variables:
    • SM_CHANNEL_TRAIN/opt/ml/input/data/train
    • SM_CHANNEL_VAL/opt/ml/input/data/val
    • SM_CHANNEL_TEST/opt/ml/input/data/test
    • SM_CHANNEL_ARTIFACTS/opt/ml/input/data/artifacts (your preprocess.joblib, class weights)
  • Where you must save:
    • SM_MODEL_DIR/opt/ml/model (SageMaker uploads this to S3 as your model artifact)
    • SM_OUTPUT_DATA_DIR/opt/ml/output/data (extra outputs like metrics.json)
You don’t export those yourself; SageMaker sets them inside the container. We’ll read them with a helper.

In VS Code, create sagemaker/code/train.py and paste this top section (we’ll add more in the next steps).

Note: remove any old placeholder.

# sagemaker/code/train.py
# Skeleton for a SageMaker Training entrypoint.
# This script will:
#   1) Read CSVs from training/validation/test channels
#   2) Load preprocessing artifacts (fit in Lab 4)
#   3) Train a baseline Logistic Regression using class weights
#   4) Evaluate on val/test and save metrics
#   5) Save the trained model to /opt/ml/model/

from __future__ import annotations

import argparse
import json
import os
from pathlib import Path

import joblib
import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    average_precision_score,
    classification_report,
    confusion_matrix,
    roc_auc_score,
)

Step 2 — Create logistic regression training code in sagemaker/code/train.py

2.1 Parse training arguments (student-tunable knobs)

Why: Let students change hyper-parameters without editing code.

# --- args: keep near the top of train.py ---
# ---------------------------
# 1) CLI arguments
# ---------------------------

def parse_args() -> argparse.Namespace:
    """
    Expose hyperparameters and common knobs so we can change behavior
    without editing code. Sensible defaults are provided.
    """
    p = argparse.ArgumentParser()
    p.add_argument("--target", default="Churn", help="Target label column name")
    p.add_argument("--max-iter", type=int, default=200, help="Max iterations for optimizer")
    p.add_argument("-C", "--C", type=float, default=1.0,
                   help="Inverse regularization strength")
    p.add_argument("--penalty", default="l2", choices=["l2"], help="Regularization type")
    p.add_argument(
        "--solver",
        default="lbfgs",
        choices=["lbfgs", "liblinear", "saga"],
        help="Optimizer (lbfgs is a good default for dense OHE features)",
    )
    p.add_argument("--random-state", type=int, default=42, help="Reproducibility")
    p.add_argument(
        "--class-weights",
        default="auto",
        choices=["auto", "balanced", "none"],
        help=(
            "auto: use class_weights from preprocess.joblib if present, "
            "else fallback to sklearn 'balanced'. "
            "balanced: force sklearn 'balanced'. none: do not use class weights."
        ),
    )
    return p.parse_args()

Plain words

  • max_iter: how long to let training run (more = more chance to converge).
  • C: knob for regularization strength (smaller C = stronger regularization = simpler model).
  • penalty: the type of regularization; l2 is the common default.
  • solver: the optimization algorithm; lbfgs is a safe default for dense features.
  • class_weights: handle imbalance. auto = use your artifact weights if present, else scikit-learn’s balanced.

2.2 File-system helper for SageMaker paths

Why: Read the mounted paths SageMaker gives us; also work locally if you dry-run.

# ---------------------------
# 2) IO paths (SageMaker Training)
# ---------------------------

def io_paths_from_env() -> tuple[Path, Path, Path, Path, Path, Path]:
    """
    Resolve Training job channels and output dirs. SageMaker sets these env vars.
    We also provide local defaults so you can dry-run outside SageMaker.
    """
    in_train = Path(os.environ.get("SM_CHANNEL_TRAIN", "/opt/ml/input/data/train"))
    in_val = Path(os.environ.get("SM_CHANNEL_VAL", "/opt/ml/input/data/val"))
    in_test = Path(os.environ.get("SM_CHANNEL_TEST", "/opt/ml/input/data/test"))
    in_art = Path(os.environ.get("SM_CHANNEL_ARTIFACTS", "/opt/ml/input/data/artifacts"))

    model_dir = Path(os.environ.get("SM_MODEL_DIR", "/opt/ml/model"))
    out_dir = Path(os.environ.get("SM_OUTPUT_DATA_DIR", "/opt/ml/output/data"))

    for d in [model_dir, out_dir]:
        d.mkdir(parents=True, exist_ok=True)

    return in_train, in_val, in_test, in_art, model_dir, out_dir

Plain words

  • Inside SageMaker: those env vars are set automatically.
  • Locally: the fallbacks let you unit-test pieces on your laptop or dev box.

2.3 Read CSVs and split features/target

Why: processed CSVs don’t have "Churn" anymore. They have label and f0..fN. We must read columns.json/schema.json to know the exact column order and the label name.

# ---------------------------
# 3) Schema & CSV loading helpers (for processed splits)
# ---------------------------
# Assumes: import json, pandas as pd, and from pathlib import Path are already at top.

def read_schema_columns(art_dir: Path) -> tuple[list[str], str, dict[str, int]]:
    """
    Read columns.json/schema.json if present to discover feature order, 
    label name, and mapping.
    """
    feature_cols: list[str] = []
    label_name: str = "label"
    label_mapping: dict[str, int] = {"No": 0, "Yes": 1}

    cols_file = art_dir / "columns.json"
    sch_file = art_dir / "schema.json"

    if cols_file.exists():
        obj = json.loads(cols_file.read_text())
        cols = obj.get("columns", [])
        if isinstance(cols, list) and cols:
            # Our convention is f0..fN + "label"
            if cols[-1] == "label":
                feature_cols = [c for c in cols if c != "label"]
                label_name = "label"

    if sch_file.exists():
        sch = json.loads(sch_file.read_text())
        label_name = sch.get("label_column", label_name)
        label_mapping = sch.get("label_mapping", label_mapping)

    return feature_cols, label_name, label_mapping


def read_all_csvs(folder: Path) -> pd.DataFrame:
    """Load one or more CSV files from a folder and concat them."""
    files: list[Path] = sorted(folder.glob("*.csv"))
    if not files:
        raise FileNotFoundError(f"No CSV files under {folder}")
    frames = [pd.read_csv(f) for f in files]
    return pd.concat(frames, axis=0, ignore_index=True)


def load_processed_split(
    folder: Path, feature_cols: list[str], label_name: str
) -> tuple[pd.DataFrame, pd.Series]:
    """
    Load a processed split and return (X, y).
    If feature_cols is empty, infer as all columns except label_name
    (or fall back to "all but last" if label_name missing).
    """
    df = read_all_csvs(folder)

    if not feature_cols:
        if label_name in df.columns:
            feature_cols = [c for c in df.columns if c != label_name]
        else:
            # Fallback: assume last column is label
            feature_cols = df.columns[:-1].tolist()
            label_name = df.columns[-1]

    if label_name not in df.columns:
        raise KeyError(
            f"Label column '{label_name}' not found in {folder}. "
            f"Available: {list(df.columns)[:8]}..."
        )

    X = df[feature_cols].astype("float64")
    y = df[label_name].astype("int64")
    return X, y

2.4 Load your fitted preprocessing pipeline and class weights

Why: class weights live inside preprocess.joblib; we don’t need a separate class_weights.json. We also don’t re-apply transforms here because the splits are already numeric.

# ---------------------------
# 4) Load bundle (preprocess + metadata + class weights)
# ---------------------------

def load_artifacts_bundle(art_dir: Path) -> dict:
    """
    Load the single bundle produced by preprocessing.
    Expected keys include:
      - 'preprocessor' (fitted ColumnTransformer)
      - 'scaler'
      - 'class_weights' (dict like {0: w0, 1: w1})
      - 'feature_names' / 'feature_count' (best effort)
    """
    path = art_dir / "preprocess.joblib"
    if not path.exists():
        raise FileNotFoundError(f"Missing {path}. Run Lab 4 to generate preprocessing artifacts.")
    return joblib.load(path)

Plain words

  • preprocess.joblib = ColumnTransformer with imputers/encoders/scaler you fit on train.
  • class_weights.json = tells training to care more about the minority churn class.

2.5 Check and enforce DTypes and set labels to 0/1

Why: features are already transformed (dense numerics). We only ensure dtypes and that labels are 0/1.

# ---------------------------
# 5) Light type guards (processed data)
# ---------------------------

def ensure_numeric(X: pd.DataFrame) -> np.ndarray:
    arr = X.astype("float64").to_numpy()
    return arr

def ensure_binary_int(y: pd.Series) -> np.ndarray:
    # Processed splits already have 0/1 in 'label', but be strict about dtype.
    yy = pd.to_numeric(y, errors="raise").astype("int64")
    if set(pd.unique(yy)) - {0, 1}:
        raise ValueError("Label must be binary 0/1 in processed splits.")
    return yy.to_numpy()

Plain words:

  • Models need numbers, not strings. We turn Churn into 1/0.

2.6 Train Logistic Regression (with class weight choices)

What is Logistic Regression?

A linear model that estimates probability of churn (0–1). It’s simple and a great baseline for imbalanced, binary targets.

# ---------------------------
# 6) Train a baseline Logistic Regression
# ---------------------------

def pick_class_weight(arg_choice: str, artifact_weights: dict | None):
    """
    arg_choice:
      - 'auto': use artifact weights if available, else sklearn 'balanced'
      - 'balanced': always use sklearn 'balanced'
      - 'none': do not use class weights
    """
    if arg_choice == "auto":
        return artifact_weights or "balanced"
    if arg_choice == "balanced":
        return "balanced"
    return None  # 'none'

def fit_logistic_regression(X_train, y_train, args, class_weights):
    cw = pick_class_weight(args.class_weights, class_weights)
    model = LogisticRegression(
        max_iter=args.max_iter,
        C=args.C,
        penalty=args.penalty,
        solver=args.solver,      # 'lbfgs' is a good default for dense inputs
        class_weight=cw,
        random_state=args.random_state,
    )
    model.fit(X_train, y_train)
    return model

Plain words about the knobs

  • C: smaller → stronger regularization → simpler model (often better generalization).
  • solver: the optimizer; lbfgs is robust for dense inputs; liblinear ok for small data; saga supports sparse.
  • class_weight: helps with class imbalance so “Yes” cases aren’t ignored.
  • If you pick auto, we’ll use the weights you computed from the training split in Lab 4 (best), or fall back to sklearn’s balanced.

2.7 Evaluate on train/val/test (and what the numbers mean)

We’ll compute:

  • ROC AUC (0.5–1.0): higher is better. Probability-based; insensitive to class prevalence.
  • PR AUC (Average Precision): focuses on the positive class; very useful when “Yes” is rare.
  • Confusion matrix: counts of TP/FP/TN/PN at the 0.5 threshold.
  • Classification report: per-class precision/recall/F1.
# ---------------------------
# 7) Evaluation helpers
# ---------------------------

def evaluate_split(model, X, y, split_name: str) -> dict:
    """
    Compute common metrics. For a first pass we capture:
      - ROC AUC (area under ROC curve)
      - PR AUC (average precision = area under Precision-Recall)
      - Confusion matrix
      - Classification report (per-class precision/recall/F1)
    """
    from sklearn.metrics import (
        average_precision_score,
        classification_report,
        confusion_matrix,
        roc_auc_score,
    )

    proba = model.predict_proba(X)[:, 1]
    pred = (proba >= 0.5).astype(int)

    return {
        "split": split_name,
        "roc_auc": float(roc_auc_score(y, proba)),
        "pr_auc": float(average_precision_score(y, proba)),
        "confusion_matrix": confusion_matrix(y, pred).tolist(),
        "classification_report": classification_report(y, pred, output_dict=True),
    }

Plain words

  • Why three splits?
    • Train: what the model learned on.
    • Val: tune hyper-parameters without peeking at the test.
    • Test: untouched, final “exam” score.

2.8 Put it all together in main() and write outputs

SageMaker will automatically upload:

  • whatever you write to /opt/ml/model as the model artifact, and
  • whatever you write to /opt/ml/output/data as job output data.
# ---------------------------
# 8) Main
# ---------------------------

def resolve_class_weight(mode: str, y_train: pd.Series, bundle: dict | None) -> dict[int, float] | None:
    """
    - 'none'     -> None
    - 'balanced' -> sklearn-computed from y_train
    - 'auto'     -> bundle['class_weights'] if present else sklearn 'balanced'
    """
    from sklearn.utils.class_weight import compute_class_weight
    if mode == "none":
        return None
    if mode == "balanced":
        classes = np.sort(y_train.unique())
        weights = compute_class_weight("balanced", classes=classes, y=y_train.to_numpy())
        return {int(c): float(w) for c, w in zip(classes, weights)}
    # auto
    if bundle and "class_weights" in bundle:
        cw = bundle["class_weights"]
        return {int(k): float(v) for k, v in cw.items()}
    classes = np.sort(y_train.unique())
    weights = compute_class_weight("balanced", classes=classes, y=y_train.to_numpy())
    return {int(c): float(w) for c, w in zip(classes, weights)}


def main():
    args = parse_args()
    in_train, in_val, in_test, in_art, model_dir, out_dir = io_paths_from_env()

    # 1) Read schema & columns (so we know features and label names)
    feature_cols, label_name, _ = read_schema_columns(in_art)

    # 2) Load processed splits
    Xtr_df, ytr_s = load_processed_split(in_train, feature_cols, label_name)
    Xva_df, yva_s = load_processed_split(in_val,   feature_cols, label_name)
    Xte_df, yte_s = load_processed_split(in_test,  feature_cols, label_name)

    # 3) Enforce numeric types (already numeric, but be explicit)
    Xtr, Xva, Xte = ensure_numeric(Xtr_df), ensure_numeric(Xva_df), ensure_numeric(Xte_df)
    ytr, yva, yte = ensure_binary_int(ytr_s), ensure_binary_int(yva_s), ensure_binary_int(yte_s)

    # 4) Load artifacts bundle (for class weights & for future inference packaging)
    bundle = load_artifacts_bundle(in_art)
    class_weight = resolve_class_weight(args.class_weights, ytr_s, bundle)

    # 5) Train
    model = LogisticRegression(
        C=args.C,
        penalty=args.penalty,
        solver=args.solver,
        max_iter=args.max_iter,
        random_state=args.random_state,
        class_weight=class_weight,
    )
    model.fit(Xtr, ytr)

    # 6) Evaluate
    from sklearn.metrics import (
        accuracy_score,
        average_precision_score,
        f1_score,
        roc_auc_score,
        precision_recall_fscore_support,
    )

    proba_va = model.predict_proba(Xva)[:, 1]
    pred_va  = (proba_va >= 0.5).astype("int64")
    prec, rec, f1, _ = precision_recall_fscore_support(yva, pred_va, average="binary", zero_division=0)

    metrics = {
        "val/roc_auc": float(roc_auc_score(yva, proba_va)),
        "val/pr_auc":  float(average_precision_score(yva, proba_va)),
        "val/accuracy": float(accuracy_score(yva, pred_va)),
        "val/f1": float(f1),
        "val/precision": float(prec),
        "val/recall": float(rec),
    }

    proba_te = model.predict_proba(Xte)[:, 1]
    pred_te  = (proba_te >= 0.5).astype("int64")
    metrics.update({
        "test/roc_auc": float(roc_auc_score(yte, proba_te)),
        "test/pr_auc":  float(average_precision_score(yte, proba_te)),
        "test/accuracy": float(accuracy_score(yte, pred_te)),
        "test/f1": float(f1_score(yte, pred_te)),
    })

    # 7) Save metrics and model
    out_dir.mkdir(parents=True, exist_ok=True)
    (out_dir / "metrics.json").write_text(json.dumps(metrics, indent=2))
    # also drop a copy next to the model artifact
    (model_dir / "metrics.json").write_text(json.dumps(metrics, indent=2))

    # (Nice for later inference labs): ship the preprocessor in the same artifact
    to_save = {"model": model}
    if "preprocessor" in bundle:
        to_save["preprocess"] = bundle["preprocessor"]
    joblib.dump(to_save, model_dir / "model.joblib")

    print("=== Training complete ===")
    print(json.dumps(metrics, indent=2))
    print("Saved:", model_dir / "model.joblib", "and", out_dir / "metrics.json")


if __name__ == "__main__":
    main()

Plain words: what happens when the job runs

  1. SageMaker mounts your S3 prefixes into the container and sets the SM_* env vars.
  1. The script loads the train/val/test CSVs and the preprocessing artifact from Lab 4.
  1. It transforms features, converts Churn to 0/1, fits LogisticRegression, evaluates, and writes:
    • metrics.json/opt/ml/output/data (SageMaker uploads it next to the job output in S3)
    • model.joblib/opt/ml/model (SageMaker uploads it to the Model Artifacts S3 path)

Why it matters

  • Teams can later download metrics.json to compare runs.
  • Bundling the preprocessor with the model makes inference robust: the same transforms get applied to live data.

Step 3 — Submit the training job (with clear channel mapping)

Create scripts/submit_training.py:

# scripts/submit_training.py
import os
from datetime import datetime

import argparse

import boto3
import sagemaker
from sagemaker.inputs import TrainingInput
from sagemaker.sklearn.estimator import SKLearn

# 0) Helper function to parse the job submit parameters.
# These parameters are parsed when we submit the job from our DevBox.
# We wrap it in SKLearn estimator and pass it to SageMaker

def parse_submit_args():
    p = argparse.ArgumentParser()
    p.add_argument("--target", default="Churn")
    p.add_argument("--max-iter", type=int, default=200)
    p.add_argument("-C", "--C", type=float, default=1.0)
    p.add_argument("--penalty", default="l2", choices=["l2"])
    p.add_argument("--solver", default="lbfgs", choices=["lbfgs", "liblinear", "saga"])
    p.add_argument("--class-weights", default="auto", choices=["auto", "balanced", "none"])
    p.add_argument("--random-state", type=int, default=42)
    return p.parse_args()

args = parse_submit_args()

# 1) Env (from ~/mlops-env.sh)
REGION = os.environ.get("AWS_REGION", "ap-northeast-2")
ROLE = os.environ["SM_ROLE_ARN"]
LABP = os.environ.get("LAB_PREFIX", "student")
S3_DATA_PROCESSED = os.environ["S3_DATA_PROCESSED"]  # .../data/processed
S3_ART_PREPROCESS = os.environ["S3_ART_PREPROCESS"]  # .../artifacts/preprocess
S3_ARTIFACTS = os.environ["S3_ARTIFACTS"]  # .../artifacts

# 2) Session
boto_sess = boto3.Session(region_name=REGION)
sm_sess = sagemaker.Session(boto_session=boto_sess)

# 3) Estimator
est = SKLearn(
    entry_point="sagemaker/code/train.py",
    role=ROLE,
    instance_type="ml.m5.large",
    instance_count=1,
    framework_version="1.2-1",
    py_version="py3",
    sagemaker_session=sm_sess,
    base_job_name=f"{LABP}-train",
    # Ensure artifacts go to YOUR bucket/prefix
    output_path=f"{S3_ARTIFACTS}/training/",
    code_location=f"{S3_ARTIFACTS}/code/",
    hyperparameters={
        "target": args.target,
        "max-iter": args.max_iter,
        "C": args.C,
        "penalty": args.penalty,
        "solver": args.solver,
        "class-weights": args.class_weights,
        "random-state": args.random_state,
    },
)

# 4) Channels (processed splits + preprocess artifacts)
inputs = {
    "train": TrainingInput(f"{S3_DATA_PROCESSED}/train/"),
    "val": TrainingInput(f"{S3_DATA_PROCESSED}/val/"),
    "test": TrainingInput(f"{S3_DATA_PROCESSED}/test/"),
    "artifacts": TrainingInput(f"{S3_ART_PREPROCESS}/artifacts/"),
}

# 5) Launch
job_name = f"{LABP}-train-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
print("Submitting job:", job_name)
est.fit(inputs=inputs, job_name=job_name, wait=True, logs="All")
print("Training job finished.")

What those “channels” do (very important):

Because your inputs dict uses keys "train", "val", "test", "artifacts", SageMaker sets inside the container:

Channel keyEnv varMounted path
trainSM_CHANNEL_TRAIN/opt/ml/input/data/train
valSM_CHANNEL_VAL/opt/ml/input/data/val
testSM_CHANNEL_TEST/opt/ml/input/data/test
artifactsSM_CHANNEL_ARTIFACTS/opt/ml/input/data/artifacts

Your io_paths_from_env() reads those to know where to load from and where to save to.


Step 4 — Run + where to look

From repo root:

# Ensure env
source ~/mlops-env.sh
echo "S3_DATA_PROCESSED=$S3_DATA_PROCESSED"
echo "S3_ART_PREPROCESS=$S3_ART_PREPROCESS"
echo "S3_ARTIFACTS=$S3_ARTIFACTS"
echo "SM_ROLE_ARN=$SM_ROLE_ARN"

# Commit + hooks (recommended loop)
git add sagemaker/code/train.py scripts/submit_training.py
pre-commit run --all-files
git add -A && pre-commit run --all-files
git commit -m "Lab 5: baseline logistic regression using processed data + artifacts"
git push

# Launch training
python scripts/submit_training.py

Where to check results

  • In the SageMaker console → Training jobs → click your job:
    • Artifacts S3: contains model.tar.gz → inside is model.joblib
    • Output data: contains metrics.json
  • Quick CLI peek at metrics:
# Model artifact (model.tar.gz) + output.tar.gz
aws s3 ls "$S3_ARTIFACTS/training/" --recursive | tail

# If you added the 'metrics.json' copy to model dir:
# In our lab, we added to both, so this or the command below,
# both will work fine
aws s3 cp "$(aws s3 ls "$S3_ARTIFACTS/training/" --recursive | awk '/model\.tar\.gz$/{print $4}' | tail -n1 | sed "s|^|s3://$(echo $S3_ARTIFACTS | cut -d/ -f3)/|")" /tmp/model.tgz
tar -tzf /tmp/model.tgz | grep metrics.json
tar -xzf /tmp/model.tgz -C /tmp metrics.json && cat /tmp/metrics.json | jq .

# Otherwise, extract from output.tar.gz:
aws s3 cp "$(aws s3 ls "$S3_ARTIFACTS/training/" --recursive | awk '/output\.tar\.gz$/{print $4}' | tail -n1 | sed "s|^|s3://$(echo $S3_ARTIFACTS | cut -d/ -f3)/|")" /tmp/out.tgz
tar -tzf /tmp/out.tgz
tar -xzf /tmp/out.tgz -C /tmp metrics.json && cat /tmp/metrics.json | jq .

Troubleshooting

  • KeyError: 'Churn'

    Your processed splits must include the target column. Recheck Lab 4 outputs.

  • ValueError: solver 'lbfgs' does not support sparse input

    We already densify. If you removed that, either keep densify or switch to solver="saga".

  • AccessDenied

    Confirm your SM_ROLE_ARN has S3 permissions for the S3_DATA and S3_ARTIFACTS prefixes.

  • Pre-commit complains about imports

    Run ruff check --fix sagemaker/code/train.py (then commit). It’s the fastest fix.


Step 5 — Testing the model

Download the latest model.tar.gz, extract model.joblib, grab one row from processed test data, and score it. Download the model using CLI and run a test of the mod from a Jupyter Notebook cell.

CLI to fetch artifacts locally

# 1) Model artifact → /tmp/model.joblib
aws s3 cp "$(aws s3 ls "$S3_ARTIFACTS/training/" --recursive \
  | awk '/model\.tar\.gz$/{print $4}' | tail -n1 \
  | sed "s|^|s3://$(echo $S3_ARTIFACTS | cut -d/ -f3)/|")" /tmp/model.tgz
tar -xzf /tmp/model.tgz -C /tmp model.joblib

# 2) One processed split row → /tmp/test.csv
aws s3 cp "$S3_DATA_PROCESSED/test/test.csv" /tmp/test.csv

Python notebook cell to score a single row

  • Make sure to set the Kernel to our venv Kernel.
import joblib, pandas as pd, numpy as np

# Load model bundle (we saved {"model": LogisticRegression, "preprocess": optional})
bundle = joblib.load("/tmp/model.joblib")
model = bundle["model"]

# Load processed test split (features already numeric f0..fN + label)
df = pd.read_csv("/tmp/test.csv")
X = df.drop(columns=["label"]).astype("float64").to_numpy()
y = df["label"].astype("int64").to_numpy()

proba = model.predict_proba(X[:1])[:, 1][0]
pred  = int(proba >= 0.5)
print(f"True label: {y[0]}  |  Pred: {pred}  |  p(churn=1): {proba:.4f}")

Gentle theory (for non-experts)

  • Logistic Regression predicts a probability of churn using a weighted sum of features passed through a sigmoid curve.
    • If features are one-hot encoded categories + scaled numbers, logistic regression is a strong, transparent baseline.
  • Regularization (C) keeps the model from over-fitting by shrinking weights.
    • Smaller C → more shrinkage → simpler model (often better on new data).
  • Class imbalance means “Yes” is rarer than “No”.
    • Without class weights, the model can “look good” by mostly predicting “No”. We use balanced/artifact weights to keep it honest.
  • ROC AUC vs PR AUC
    • ROC AUC: good all-around; less sensitive to imbalance.
    • PR AUC (Average Precision): focuses on the positive class; especially useful when positives are rare (which is churn).

(Optional) Mini assignment prompts

  • Change C to 0.3, 1.0, 3.0. Which direction do PR AUC and ROC AUC move? Why?
  • Try class-weights=balanced vs none. What happens to recall on the churn class?
  • Switch solver to saga. What changes (and why might that help if you keep features sparse)?
Answers:
python scripts/submit_training.py -C 0.3
python scripts/submit_training.py --class-weights balanced
python scripts/submit_training.py --solver saga --max-iter 500

1) Change C (0.3 → 1.0 → 3.0): what happens to PR AUC and ROC AUC?

  • What C means: it’s the inverse of regularization.

    Smaller Cmore regularization (simpler, more “shrunk” model).

    Larger Cless regularization (more flexible model).

  • Typical direction on Telco-like, one-hot + numeric data:
    • C = 0.3 (stronger reg): both PR AUC and ROC AUC usually tick down a bit → mild underfit (weights are too small to separate churners well).
    • C = 1.0 (baseline): near the sweet spot.
    • C = 3.0 (weaker reg): often a tiny uptick or no real change in AUCs; if you push C too high, you can see minor drops from overfitting (rare categories get overweighted).
  • Why PR AUC may move more than ROC AUC: PR AUC focuses on the positive (churn) class and is more sensitive on imbalanced data; ROC AUC is steadier.

TL;DR: as you increase C from 0.3 → 1.0 → 3.0, expect AUCs to rise a little then plateau, with PR AUC showing the clearer improvement until you start overfitting.


2) Class weights: balanced vs none — “what happens to recall on the churn class?”

  • Recall (for churn) = of all true churners, what fraction did we catch? = TP / (TP + FN).
  • balanced tells the loss to care more about the minority “Yes” class.
    • Effect at the default 0.5 threshold: recall ↑ (you catch more churners), often precision ↓ (more false positives), accuracy may drop slightly.
  • none (no weighting): model favors the majority “No” class → recall ↓ for churn, precision ↑.

TL;DR: balanced ⇒ higher recall for churn (at the cost of precision). none ⇒ lower recall.


3) Switch solver to saga — what changes? why help with sparse features?

  • lbfgs: great for dense inputs; fast/stable for L2.
  • saga: handles sparse matrices natively and supports L1/elastic-net. It’s a variance-reduced stochastic method.
  • In our lab we output dense features (sparse_output=False), so metrics won’t materially change—you might just see different convergence speed.
  • If you kept OHE sparse (or had many more features), saga:
    • avoids densifying (saves memory and often time),
    • lets you use L1/elastic-net to zero out weak features.

TL;DR: With dense features, lbfgs is fine. With sparse features or L1/elastic-net, use saga.


One-liners to remember

  • C down ⇒ underfit ⇒ AUCs down a bit.

    C up (a bit) ⇒ AUCs up or flat, then risk tiny overfit.

  • balanced class weights raise churn recall; none lowers it.
  • saga shines with sparse inputs or L1/elastic-net; otherwise lbfgs is usually fastest.

↑ Back to top


Lab 6: Evaluate Model & Create Model Card


Why this lab (MLOps context)

Training accuracy alone doesn’t ship value. In production, you need:

  • Decision-ready metrics (ROC AUC, PR AUC, confusion matrix) so non-ML folks can trust what the model does.
  • A threshold that matches business trade-offs (precision vs recall).
  • A portable artifact (e.g., evaluation.json) your SageMaker Pipeline can read to gate the next steps (register/deploy only if quality ≥ bar).
  • A lightweight Model Card for governance: what the model does, how it was trained, and how it performed.

This lab does exactly that on top of the artifacts you produced earlier.


What you’ll build

  • Jupyter Notebook on VS Code to evaluate
  • Visual eval: ROC/PR curves, confusion matrix.
  • Threshold sweep to pick a working operating point.
  • evaluation.json written to S3 (for pipelines).
  • An (optional) SageMaker Model Card (or Markdown fallback) stored in S3.

Prereqs

  • Lab 4 (Processing) & Lab 5 (Training) completed.
  • Shell env loaded: source ~/mlops-env.sh

    Confirm: echo $AWS_REGION $BUCKET $S3_DATA_PROCESSED $S3_ARTIFACTS $LAB_PREFIX

You’ll run these in a Jupyter notebook (VS Code ). Create notebooks/eval_telco.ipynb in VS Code.

0) Setup & sanity check

  • Create notebooks/eval_telco.ipynb
    • make sure you create a new Jupyter Notebook
    • Attach our .venv (Python3.9.xx) kernel associated with our Python VENV

Goal: import libraries, bind environment variables, and create AWS clients used throughout.

MLOps tie-in: all paths come from env vars so your code is relocatable and pipeline-friendly.

# --- imports
import os, io, json, tarfile, tempfile
from pathlib import Path
from datetime import datetime

import boto3
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import (
    roc_curve, auc, precision_recall_curve,
    classification_report, confusion_matrix,
    roc_auc_score, average_precision_score
)
import joblib

pd.set_option("display.max_columns", 120)

# --- environment (loaded from ~/mlops-env.sh)
REGION  = os.environ.get("AWS_REGION", "ap-northeast-2")
BUCKET  = os.environ["BUCKET"]
S3_DATA_PROCESSED = os.environ["S3_DATA_PROCESSED"]   # e.g., s3://.../data/processed
S3_ARTIFACTS      = os.environ["S3_ARTIFACTS"]        # e.g., s3://.../artifacts
LABP   = os.environ.get("LAB_PREFIX", "student")

# --- AWS clients
boto_sess = boto3.Session(region_name=REGION)
s3 = boto_sess.client("s3")
sm = boto_sess.client("sagemaker")

def parse_s3(uri: str):
    """Split s3://bucket/key into (bucket, key)."""
    assert uri.startswith("s3://"), f"Not an s3 uri: {uri}"
    b, k = uri[5:].split("/", 1)
    return b, k

1) Find the latest training job & read metrics.json (if present)

Goal: grab your most recent training job by name pattern and pull any precomputed metrics.

MLOps tie-in: in pipelines you’ll pass the job name directly; here we discover it to keep the lab simple.

# --- list & pick the latest training job with our prefix
resp = sm.list_training_jobs(SortBy="CreationTime", SortOrder="Descending", MaxResults=20)
jobs = [j for j in resp["TrainingJobSummaries"] if j["TrainingJobName"].startswith(f"{LABP}-train-")]
if not jobs:
    raise SystemExit("No training jobs found for this LAB_PREFIX. Run Lab 5 first.")
job_name = jobs[0]["TrainingJobName"]
print("Latest training job:", job_name)

# --- describe to get S3 locations
desc = sm.describe_training_job(TrainingJobName=job_name)
model_art  = desc["ModelArtifacts"]["S3ModelArtifacts"]      # s3://.../model.tar.gz
output_pre = desc["OutputDataConfig"]["S3OutputPath"]        # s3://.../artifacts/training/
output_tar = f"{output_pre.rstrip('/')}/{job_name}/output/output.tar.gz"

# --- try to fetch metrics.json from output.tar.gz (nice to have; we’ll still recompute)
metrics = None
try:
    b_out, k_out = parse_s3(output_tar)
    with tempfile.TemporaryDirectory() as td:
        local_tar = Path(td) / "output.tar.gz"
        s3.download_file(b_out, k_out, str(local_tar))
        with tarfile.open(local_tar) as t:
            names = [m.name for m in t.getmembers()]
            if "metrics.json" in names:
                metrics = json.load(t.extractfile("metrics.json"))
except Exception as e:
    print("[WARN] Could not read metrics.json from output tar:", e)

print("metrics.json (if any):", json.dumps(metrics, indent=2)[:400], "..." if metrics else "None")

2) Load the trained model & the processed test split

Goal: load model.joblib and the test CSV (f0..fN + label) to compute fresh metrics/plots.

MLOps tie-in: test metrics are what you’ll gate on in pipelines.

# --- load model.joblib from model.tar.gz
b_mod, k_mod = parse_s3(model_art)
with tempfile.TemporaryDirectory() as td:
    local_tar = Path(td) / "model.tar.gz"
    s3.download_file(b_mod, k_mod, str(local_tar))
    with tarfile.open(local_tar) as t:
        member = next((m for m in t.getmembers() if m.name.endswith("model.joblib")), None)
        if not member:
            raise FileNotFoundError("model.joblib not found in model artifact")
        model_bundle = joblib.load(t.extractfile(member))

model = model_bundle["model"]              # scikit-learn LogisticRegression
preprocess = model_bundle.get("preprocess")  # may be None; splits are already numeric

# --- read processed test split
def s3_read_csv(uri: str) -> pd.DataFrame:
    b, k = parse_s3(uri)
    obj = s3.get_object(Bucket=b, Key=k)
    return pd.read_csv(io.BytesIO(obj["Body"].read()))

test_uri = f"{S3_DATA_PROCESSED.rstrip('/')}/test/test.csv"
df_test = s3_read_csv(test_uri)

feature_cols = [c for c in df_test.columns if c != "label"]
X_test = df_test[feature_cols].astype("float64").to_numpy()
y_test = df_test["label"].astype("int64").to_numpy()

X_test.shape, y_test.shape


3) Evaluate at the default threshold (0.5) + draw curves

Goal: compute easy-to-explain scores and visuals (ROC/PR) that stakeholders understand.

MLOps tie-in: These are the core acceptance metrics stored in evaluation.json.

# --- Evaluate at the default threshold (0.5) and plot side-by-side
from sklearn.metrics import (
    roc_auc_score, average_precision_score, confusion_matrix, classification_report,
    roc_curve, precision_recall_curve, auc
)
import matplotlib.pyplot as plt
import numpy as np

# 1) Probabilities and default 0.5 decision
proba = model.predict_proba(X_test)[:, 1]
pred  = (proba >= 0.5).astype(int)

# 2) Scalar scores + confusion matrix
roc = roc_auc_score(y_test, proba)                 # ranking quality (threshold-free)
pr  = average_precision_score(y_test, proba)       # area under PR curve (a.k.a. AP)
cm  = confusion_matrix(y_test, pred)
rep = classification_report(y_test, pred, output_dict=True)

print(f"Test ROC AUC: {roc:.3f} | Test PR AUC: {pr:.3f}")
print("Confusion matrix @0.5 [TN FP; FN TP]:\n", cm)

# 3) Curves (side-by-side)
fpr, tpr, _ = roc_curve(y_test, proba)
prec, rec, _ = precision_recall_curve(y_test, proba)
roc_auc = auc(fpr, tpr)                            # equals roc above
ap = pr                                            # alias for clarity
base_prec = y_test.mean()                          # prevalence baseline for PR

fig, axes = plt.subplots(1, 2, figsize=(11, 4))

# Left: ROC
axes[0].plot(fpr, tpr, label=f"AUC = {roc_auc:.3f}")
axes[0].plot([0, 1], [0, 1], "--", linewidth=1)
axes[0].set_title("ROC Curve (Test)")
axes[0].set_xlabel("False Positive Rate (1 - Specificity)")
axes[0].set_ylabel("True Positive Rate (Recall)")
axes[0].legend(loc="lower right")

# Right: Precision–Recall
axes[1].plot(rec, prec, label=f"AP = {ap:.3f}")
axes[1].axhline(base_prec, linestyle="--", linewidth=1, label=f"Baseline = {base_prec:.3f}")
axes[1].set_title("Precision–Recall Curve (Test)")
axes[1].set_xlabel("Recall")
axes[1].set_ylabel("Precision")
axes[1].legend(loc="lower left")

fig.tight_layout()
plt.show()


What you’re looking at:

  • Confusion matrix @ 0.5

    The concrete counts you’d feel in operations:

    • TP (true positives): churners correctly flagged
    • FP (false positives): non-churners flagged by mistake
    • FN (false negatives): churners missed
    • TN (true negatives): non-churners correctly ignored
  • ROC curve (left) = How well the model ranks churners above non-churners, ignoring any single threshold.

    Each point is a different threshold. Higher and more “bowed” to the top-left is better.

    ROC AUC is the area under that curve. A handy intuition: it’s the probability that the model assigns a higher score to a random churner than to a random non-churner.

    • Good when: classes are fairly balanced or you want a threshold-free view of ranking quality.
  • Precision–Recall curve (right) = Trade-off between “how many of the flagged users are truly churners” (precision) and “how many churners did we catch” (recall).

    The dashed horizontal line is the baseline precision equal to churn rate (prevalence). Your curve should sit above it.

    PR AUC / AP summarizes the whole curve—higher is better.

    • Great when: churners are rare (class imbalance). It focuses on the positive class.

Quick refresher: Precision vs. Recall

  • Precision = Of the users we flagged, what fraction were truly churners?

    “How clean are our alerts?”

    High precision → few false alarms (FP low).

  • Recall = Of all true churners, what fraction did we catch?

    “How many did we find?”

    High recall → few misses (FN low).

Moving the decision threshold:

  • Higher threshold → stricter: precision ↑, recall ↓ (fewer alerts, cleaner ones).
  • Lower threshold → looser: recall ↑, precision ↓ (catch more, but noisier).
In practice: pick the operating point that fits your business costs (what’s worse: missing a churner, or bugging a happy customer?). The PR curve is great for visualizing that trade-off when churners are scarce.

Generic Explanation of ROC AUC and Precision-Recall AUC

The ROC AUC and Precision-Recall AUC are metrics used to evaluate the performance of binary classification models, but they focus on different aspects of performance and are suited for different scenarios. Here's a concise comparison with reasoning:

1. ROC AUC (Receiver Operating Characteristic Area Under the Curve)

  • Definition: ROC AUC measures the area under the ROC curve, which plots the True Positive Rate (TPR) (also called Recall or Sensitivity) against the False Positive Rate (FPR) at various classification thresholds.
    • TPR = TP / (TP + FN) (True Positives / All Actual Positives)
    • FPR = FP / (FP + TN) (False Positives / All Actual Negatives)
  • What it measures: How well the model distinguishes between the positive and negative classes across all thresholds. A higher ROC AUC indicates better separation between classes.
  • Range: 0 to 1 (0.5 = random guessing, 1 = perfect classification).
  • Strengths:
    • Good for balanced datasets where positive and negative classes are roughly equal.
    • Captures overall model performance, considering both TPR and FPR.
  • Weaknesses:
    • Can be misleading for imbalanced datasets (e.g., when positives are rare). A high ROC AUC may occur even if the model struggles to identify the minority class because FPR is dominated by the negative class.
    • Doesn’t directly reflect precision, which is critical in imbalanced scenarios.
  • Use case: General-purpose evaluation when class imbalance isn’t a major issue (e.g., medical diagnostics with balanced data).

2. Precision-Recall AUC (Area Under the Precision-Recall Curve)

  • Definition: Precision-Recall AUC measures the area under the curve that plots Precision against Recall at various thresholds.
    • Precision = TP / (TP + FP) (True Positives / All Predicted Positives)
    • Recall = TP / (TP + FN) (same as TPR)
  • What it measures: How well the model performs in identifying positive cases, focusing on the trade-off between precision (how many selected items are relevant) and recall (how many relevant items are selected).
  • Range: 0 to 1 (baseline depends on the positive class proportion; 1 = perfect performance).
  • Strengths:
    • More informative for imbalanced datasets (e.g., fraud detection, rare disease diagnosis) where the positive class is a small fraction of the data.
    • Focuses on the positive class, ignoring true negatives, which makes it sensitive to performance on the minority class.
  • Weaknesses:
    • Ignores true negatives, so it doesn’t fully assess performance on the negative class.
    • Less intuitive for balanced datasets where ROC AUC is often preferred.
  • Use case: Scenarios with high class imbalance, where correctly identifying positives is critical (e.g., spam detection, anomaly detection).

Key Differences

AspectROC AUCPrecision-Recall AUC
X-AxisFalse Positive Rate (FPR)Recall (TPR)
Y-AxisTrue Positive Rate (Recall)Precision
FocusOverall class separation (positive vs. negative)Positive class performance
Best forBalanced datasetsImbalanced datasets
Sensitivity to ImbalanceLess sensitive; can mask poor performance on minority classHighly sensitive to minority class performance
IgnoresPrecisionTrue Negatives

When to Use Which

  • Choose ROC AUC when:
    • The dataset is balanced or you care about overall model performance.
    • False positives and true negatives are equally important (e.g., equal cost for errors).
    • Example: Predicting whether a patient has a disease when positive and negative cases are roughly equal.
  • Choose Precision-Recall AUC when:
    • The dataset is imbalanced (e.g., 1% positives).
    • You prioritize performance on the positive class (e.g., minimizing false positives is critical).
    • Example: Fraud detection, where identifying rare fraudulent transactions is key, and false positives (flagging non-fraud) are costly.

Practical Example

Suppose you’re evaluating a model for detecting defective products (1% defective, 99% non-defective):

  • ROC AUC: Might be high (e.g., 0.9) because the model correctly classifies most non-defective items (large negative class), even if it misses many defects.
  • Precision-Recall AUC: Could be lower (e.g., 0.4) if the model struggles to identify defects (low recall) or flags too many non-defects as defective (low precision). This better reflects performance on the rare positive class.

4) Sweep thresholds to pick an operating point

Goal: choose a threshold for business goals (maximize F1, or ensure recall ≥ X, etc).

MLOps tie-in: we’ll store this threshold in evaluation.json, and your pipeline can use it downstream for evaluation or deployment configs.

# --- compute P/R/F1 across thresholds
def sweep_thresholds(y_true, p):
    prec, rec, thr = precision_recall_curve(y_true, p)
    thr = np.r_[0.0, thr]  # align lengths
    f1  = 2 * (prec * rec) / (prec + rec + 1e-12)
    return pd.DataFrame({"threshold": thr, "precision": prec, "recall": rec, "f1": f1})

sweep = sweep_thresholds(y_test, proba)

# --- pick strategy: best F1 (simple balanced choice)
best_idx = int(np.nanargmax(sweep["f1"].values))
t_star   = float(sweep.iloc[best_idx]["threshold"])

# --- visualize the trade-off
plt.figure(figsize=(6,4))
plt.plot(sweep["threshold"], sweep["precision"], label="Precision")
plt.plot(sweep["threshold"], sweep["recall"],    label="Recall")
plt.plot(sweep["threshold"], sweep["f1"],        label="F1")
plt.axvline(t_star, linestyle="--", label=f"Best F1 @ {t_star:.2f}")
plt.xlabel("Threshold"); plt.ylabel("Score"); plt.title("Threshold Sweep (Test)"); plt.legend(); plt.show()

# --- confusion & report at chosen threshold
pred_star = (proba >= t_star).astype(int)
cm_star   = confusion_matrix(y_test, pred_star)
rep_star  = classification_report(y_test, pred_star, output_dict=True)

print("Chosen threshold:", round(t_star, 3))
print("Confusion matrix @t*:\n", cm_star)
pd.DataFrame(rep_star).T.head()

Which threshold to choose?

  • Max F1: balanced “one number” if you don’t have a business preference.
  • Target recall (catch churners): choose the lowest threshold where recall ≥ target.
  • Target precision (avoid false alarms): choose the highest threshold where precision ≥ target.

5) Save a portable evaluation.json to S3

Goal: produce a small, machine-readable file your SageMaker Pipeline can read and make a decision (e.g., if pr_auc >= 0.4 then register/deploy).

MLOps tie-in: this is the gate artifact for your next lab (Pipeline).

evaluation = {
    "job_name": job_name,
    "generated_at": datetime.utcnow().isoformat(timespec="seconds") + "Z",
    "test": {
        "roc_auc": float(roc),
        "pr_auc": float(pr),
        "threshold_default": 0.5,
        "threshold_star": t_star,
        "confusion_matrix@0.5": cm.tolist(),
        "confusion_matrix@t*": cm_star.tolist(),
        "report@0.5": rep,          # per-class precision/recall/f1/support
        "report@t*":  rep_star,
    },
}

print(json.dumps(evaluation, indent=2)[:800], "...\n")

eval_prefix = f"{S3_ARTIFACTS.rstrip('/')}/evaluation/{job_name}/"
b_eval, k_eval = parse_s3(eval_prefix + "evaluation.json")
s3.put_object(Bucket=b_eval, Key=k_eval, Body=json.dumps(evaluation, indent=2).encode("utf-8"))
print("Wrote:", f"{eval_prefix}evaluation.json")


6) (Optional) Create a SageMaker Model Card (or Markdown fallback)

Goal: capture the model summary/metrics in a place governance folks can find.

MLOps tie-in: model cards document what you trained and how it performed—handy for approvals.

# Try the SDK Model Card first; if your region lacks support, use the Markdown fallback below.
try:
    from sagemaker.model_card import ModelCard, ModelCardStatus
    from sagemaker.model_card import (
        ModelOverview, BusinessDetails, IntendedUses, TrainingDetails,
        EvaluationResult, Metric, ObjectiveFunction, Function
    )

    model_card_name = f"{LABP}-telco-churn-{job_name}"

    overview = ModelOverview(
        model_id=model_card_name,
        model_name="Telco Churn - Logistic Regression",
        problem_type="BinaryClassification",
        algorithm_type="LogisticRegression",
    )
    business = BusinessDetails(
        business_problem="Predict churn risk to prioritize retention outreach.",
        business_goals="Improve retention by acting on high-risk customers.",
        stakeholders=["Data Science", "Marketing", "Care Ops"],
    )
    uses = IntendedUses(
        intended_uses=["Batch and real-time scoring of churn probability"],
        factors_affecting_model_efficiency=["Pricing changes", "Plan changes"],
        risk_rating="Low",
    )
    train = TrainingDetails(
        objective_function=ObjectiveFunction(function=Function("binary_classification"), notes="ROC AUC / PR AUC"),
        training_job_details={"training_job_arn": desc["TrainingJobArn"]},
    )
    eval_results = [
        EvaluationResult(name="Test ROC AUC", metric=Metric(name="roc_auc", type="number", value=float(roc))),
        EvaluationResult(name="Test PR AUC",  metric=Metric(name="pr_auc",  type="number", value=float(pr))),
    ]

    mc = ModelCard(
        name=model_card_name,
        status=ModelCardStatus.DRAFT,
        model_overview=overview,
        business_details=business,
        intended_uses=uses,
        training_details=train,
        evaluation_results=eval_results,
    )
    mc.create()
    print("Created/updated Model Card:", model_card_name)

except Exception as e:
    print("[INFO] Falling back to Markdown model card:", e)
    md = f"""# Model Card — Telco Churn (LogReg)

**Training job:** {job_name}
**Generated:** {datetime.utcnow().isoformat(timespec="seconds")}Z

## Overview
Binary classification to predict churn. Algorithm: Logistic Regression.

## Data & Processing
- Inputs: {S3_DATA_PROCESSED} (train/val/test)
- Preprocess: median impute numerics, one-hot encode categoricals, standard-scale numerics

## Key Metrics (Test)
- ROC AUC: **{roc:.3f}**
- PR AUC: **{pr:.3f}**
- Threshold (F1*): **{t_star:.2f}**

## Confusion Matrix @ t*
{cm_star}

## Intended Use & Risks
Retention targeting; monitor drift after pricing/plan changes; avoid using sensitive attributes.
"""
    key = f"{LABP}/artifacts/model-cards/{job_name}/model_card.md"
    s3.put_object(Bucket=BUCKET, Key=key, Body=md.encode("utf-8"))
    print("Wrote s3://{}/{}".format(BUCKET, key))


7) (Optional) Mini “metrics viewer” cell for quick checks later

Goal: a tiny cell you can re-run anytime to peek at the latest numbers without replotting.

print(json.dumps(
    {
        "job": job_name,
        "test": {
            "roc_auc": round(roc, 3),
            "pr_auc": round(pr, 3),
            "best_threshold_f1": round(t_star, 3),
            "cm@t*": cm_star.tolist(),
        },
    },
    indent=2,
))


8) What to write in your lab notes

  • Final ROC AUC and PR AUC on test.
  • The chosen threshold and the why (F1* vs recall/precision target).
  • Paste the S3 URI to evaluation.json and your model card.

CLI equivalents (if you prefer terminal)

# Most recent training job
JOB=$(aws sagemaker list-training-jobs \
  --sort-by CreationTime --sort-order Descending --max-results 1 \
  --query 'TrainingJobSummaries[0].TrainingJobName' --output text)

# Pull output tar and peek at metrics.json
aws s3 cp $S3_ARTIFACTS/training/$JOB/output/output.tar.gz .
tar -tzf output.tar.gz | grep metrics.json && tar -xzf output.tar.gz metrics.json && cat metrics.json | jq .

How this lab fits the MLOps flow

  • Processing (Lab 3) produced clean splits + preprocessing artifacts.
  • Training (Lab 5) built a baseline model and saved raw metrics.
  • This lab turns metrics into decisions (threshold + acceptance) and emits evaluation.json + Model Card.
  • Next labs
    • Realtime endpoint uses the trained model (and optionally the stored threshold).
    • SageMaker Pipeline chains Processing → Training → Evaluation and gates on evaluation.json (e.g., if test.pr_auc ≥ 0.40 then register & deploy).

↑ Back to top


Lab 7: Register your Model in SageMaker Model Registry


Objective

Move beyond “just a model artifact in S3” to a governed, versioned registry where models have:

  • A unique name + version history
  • Attached evaluation metrics (from Lab 6’s evaluation.json)
  • An approval status (e.g., Pending, Approved, Rejected)

This is how production MLOps teams manage which model is the “blessed one” for deployment.


Why this Lab (MLOps context)

  • In Lab 5, you trained a Logistic Regression baseline.
  • In Lab 6, you evaluated it and saved evaluation.json with ROC AUC, PR AUC, thresholds, and confusion matrices.
  • Now in Lab 7, you’ll:
    • Register that model in SageMaker’s Model Registry.
    • Attach the evaluation metrics.
    • Set approval status (so only approved versions are deployed).

📦 Think of the Model Registry as the “artifact store + gatekeeper” in your workflow.


Prerequisites

  • Lab 6 completed (you have evaluation.json in S3).
  • Env vars loaded:
source ~/mlops-env.sh
echo $AWS_REGION $BUCKET $S3_ARTIFACTS $LAB_PREFIX $SM_ROLE_ARN
  • SageMaker Studio is not required; we are using VS Code Jupyter notebooks on the DevBox.

Step 1 — Setup Notebook and Clients

Open a new notebook in VS Code: notebooks/register_model.ipynb.

import os, json, boto3, botocore
from datetime import datetime

# --- Env (from mlops-env.sh) ---
REGION   = os.environ.get("AWS_REGION", "ap-northeast-2")
BUCKET   = os.environ["BUCKET"]
LABP     = os.environ.get("LAB_PREFIX", "student")
SM_ROLE  = os.environ["SM_ROLE_ARN"]
S3_ARTIFACTS  = os.environ["S3_ARTIFACTS"]

boto_sess = boto3.Session(region_name=REGION)
sm = boto_sess.client("sagemaker")
s3 = boto_sess.client("s3")

print("Region:", REGION)
print("Role:", SM_ROLE)

Concept (plain words):

  • We connect to SageMaker in our region.
  • sm lets us call registry APIs; s3 lets us pull the evaluation file.

Step 2 — Locate the Latest Evaluation Artifact

We’ll pull the most recent evaluation.json from Lab 6 outputs.

# Find the newest artifacts/evaluation/<job_name>/evaluation.json
candidate_prefixes = [
    "artifacts/evaluation/",           # canonical layout from Lab 6
    f"{LABP}/artifacts/evaluation/",   # tolerate older/alternate layout
]

candidates = []
paginator = s3.get_paginator("list_objects_v2")
for pref in candidate_prefixes:
    for page in paginator.paginate(Bucket=BUCKET, Prefix=pref):
        for obj in page.get("Contents", []):
            k = obj["Key"]
            if k.endswith("/evaluation.json"):
                candidates.append((obj["LastModified"], k))

if not candidates:
    raise SystemExit("No evaluation.json found under artifacts/evaluation/. Run Lab 6 first.")

# Pick the most recent evaluation.json by LastModified
last_modified, latest_key = max(candidates, key=lambda x: x[0])
print("Latest evaluation.json:", latest_key, "| LastModified:", last_modified)

# Optional: derive job_name (artifacts/evaluation/<job_name>/evaluation.json)
job_name = latest_key.rstrip("/").split("/")[-2]
print("Detected job_name:", job_name)

obj = s3.get_object(Bucket=BUCKET, Key=latest_key)
evaluation = json.loads(obj["Body"].read())
print(json.dumps(evaluation, indent=2)[:500], "...")

Step 3 — Build the Model Package Group (the “product line”)

Concept (what & why)

In SageMaker, a Model Package Group is the long‑lived container for all versions of a model you’ll register over time. Think of it as a product line (e.g., “Telco Churn”), while each Model Package you register is a specific version with its own artifacts, metrics, and approval state.

  • MLOps purpose: Central place to compare model candidates, track lineage, and control promotion (approval) into environments.
  • Plain words: One shelf (the group) that holds many jars (versions) with labels (metrics/notes/approval).

Code (idempotent: creates if missing)

# Name your “product line” once and keep reusing it.
MPG_NAME = f"{LABP}-telco-churn"

def ensure_model_package_group(name: str) -> str:
    """Create (if needed) and return the Model Package Group ARN."""
    try:
        out = sm.describe_model_package_group(ModelPackageGroupName=name)
        print("✓ Found model package group:", out["ModelPackageGroupArn"])
        return out["ModelPackageGroupArn"]
    except botocore.exceptions.ClientError as e:
        if e.response["Error"]["Code"] != "ValidationException":
            raise
        print("… Creating model package group:", name)
        out = sm.create_model_package_group(
            ModelPackageGroupName=name,
            ModelPackageGroupDescription="Telco churn model family (logreg baseline)",
            Tags=[{"Key": "lab", "Value": "mlops-sagemaker"}, {"Key": "owner", "Value": LABP}],
        )
        return out["ModelPackageGroupArn"]

MPG_ARN = ensure_model_package_group(MPG_NAME)

Expected result: You should see either “✓ Found model package group …” or “Creating model package group …” with an ARN.


Step 4 — Register this trained model as a Model Package

Concept (what & why)

A Model Package is a versioned, reviewable record that points at:

  • the model artifact on S3 (from Lab 5),
  • the inference container image that will load/serve it,
  • metrics/explanations (evaluation.json from Lab 6),
  • and an approval status (Pending/Approved/Rejected).

We register with ApprovalStatus="PendingManualApproval" first, then move to Approved after human review (Step 5). This is a quality gate: only Approved versions get deployed downstream.

  • Plain words: We file a “version card” for this model with proof (metrics) attached. It starts as pending until someone signs off.

What we need

  • model_art S3 URI (from Lab 6 Step 1’s desc["ModelArtifacts"]["S3ModelArtifacts"]).
  • An inference image URI. For scikit‑learn, we can use the AWS‑hosted framework image for our region (or your BYOC later in Lab 8).

Code (attach metrics & set pending approval)

# --- Step 4.a — Pick the serving image (framework container) for inference ---
from sagemaker import image_uris

sklearn_image = image_uris.retrieve(
    framework="sklearn",
    region=REGION,
    version="1.2-1",
    image_scope="inference",
    py_version="py3",
)
print("Serving image:", sklearn_image)

# --- Step 4.b — Gather model + metrics ---
# Derive training job name from evaluation.json produced in Lab 6
job_name = evaluation["job_name"]

# Find model.tar.gz S3 location from the training job
desc = sm.describe_training_job(TrainingJobName=job_name)
model_data_url = desc["ModelArtifacts"]["S3ModelArtifacts"]
print("Model artifact S3:", model_data_url)

# Pull test metrics from evaluation.json
test_metrics = evaluation["test"]
pr_auc  = float(test_metrics["pr_auc"])
roc_auc = float(test_metrics["roc_auc"])
threshold_star = float(test_metrics["threshold_star"])

# --- Step 4.c — Create a new Model Package version in the Group (no explicit name) ---
# Required inputs:
#   MPG_NAME        -> your model package group name (string)
#   sklearn_image   -> ECR image URI for sklearn serving
#   model_data_url  -> S3 URI to model.tar.gz (from describe_training_job)
#   S3_ARTIFACTS    -> e.g., s3://.../artifacts  (ensure this is set in env)
#   job_name        -> evaluation["job_name"]
#   pr_auc, roc_auc -> floats parsed from evaluation.json

# Point the metrics at the same evaluation.json you wrote in Lab 6
eval_json_s3 = f"{S3_ARTIFACTS.rstrip('/')}/evaluation/{job_name}/evaluation.json"

model_description = (
    f"Telco churn (LogReg). "
    f"Test ROC AUC={roc_auc:.3f}, PR AUC={pr_auc:.3f}. "
    f"Artifacts from job {job_name}."
)

create_resp = sm.create_model_package(
    ModelPackageGroupName=MPG_NAME,            # only the group (let SM version)
    ModelPackageDescription=model_description,
    InferenceSpecification={
        "Containers": [
            {
                "Image": sklearn_image,
                "ModelDataUrl": model_data_url,
            }
        ],
        "SupportedContentTypes": ["text/csv", "application/json"],
        "SupportedResponseMIMETypes": ["application/json"],
    },
    ModelApprovalStatus="PendingManualApproval",
    ModelMetrics={
        "ModelQuality": {
            "Statistics": {
                "ContentType": "application/json",
                "S3Uri": eval_json_s3,
            }
        }
    },
    # IMPORTANT: Do NOT put Tags here — tags are not supported on versions.
)

model_package_arn = create_resp["ModelPackageArn"]
print("Created Model Package version:", model_package_arn)

# --- Step 4.d — (Optional) Approve immediately (or leave pending for manual gate) ---
# If you want to auto-approve now, do it as a separate call:
# sm.update_model_package(
#     ModelPackageArn=model_package_arn,
#     ModelApprovalStatus="Approved",
#     ApprovalDescription=f"Auto-approved from Lab 7. pr_auc={pr_auc:.3f}, roc_auc={roc_auc:.3f}, t*={threshold_star:.2f}",
# )

Expected result: You should see a new Model Package ARN and, in the console (SageMaker → Model registry), an entry under your group with Pending manual approval status and a link to your evaluation.json.

Why this matters (MLOps):

  • Registration captures what will be deployed (artifact + container), how good it is (metrics), and who approved it (audit trail).
  • Pipelines can stop here if metrics don’t meet the bar, or promote when they do.


Sanity Check:
Let’s check the approval status of the Model Package we created.

resp = sm.describe_model_package(ModelPackageName=model_package_arn)
print("Approval:", resp["ModelApprovalStatus"])
print("Eval metrics S3:", resp["ModelMetrics"]["ModelQuality"]["Statistics"]["S3

Step 5 — Approve (or reject) the model version

Concept (what & why)

Approval is the human‑in‑the‑loop step: a person verifies the metrics, data context, and any risk notes before allowing promotion. This is key for governance and change control.

  • Plain words: Green‑light this version so it’s eligible for deployment.

Code (approve with a rationale)

review_comment = (
    f"Approved based on Lab 6 metrics: ROC AUC={roc_auc:.3f}, "
    f"PR AUC={pr_auc:.3f}, threshold*={threshold_star:.2f}. "
    "Meets classroom acceptance criteria."
)

_ = sm.update_model_package(
    ModelPackageArn=MODEL_PACKAGE_ARN,
    ModelApprovalStatus="Approved",  # or "Rejected"
    ApprovalDescription=review_comment,
)

print("✓ Set approval to Approved.")

Expected result: In the console you’ll see the version move from Pending to Approved and the approval note visible in the details pane.

If you want a hard gate: Before approving, compare pr_auc/roc_auc to your bar (e.g., pr_auc >= 0.40). If it misses, set Rejected and record why.


Sanity Check:
Let’s check the approval status of the Model Package after approval.

resp = sm.describe_model_package(ModelPackageName=model_package_arn)
print("Approval:", resp["ModelApprovalStatus"])
print("Eval metrics S3:", resp["ModelMetrics"]["ModelQuality"]["Statistics"]["S3

Step 6 — Verify & find the latest approved package (programmatically)

Concept (what & why)

We often need “the latest Approved version” in automation (e.g., for deployment or pipelines). Querying the registry by status keeps your scripts deterministic and safe.

  • Plain words: Ask the registry: “what’s the newest version that’s actually approved?”

Code (list and pick latest Approved)

## Step 6 — Verify & fetch the latest Approved package (fixed)

def latest_model_package_arn(group_name: str, status: str = "Approved"):
    """Return (arn, created_time) of the newest model package with given status."""
    resp = sm.list_model_packages(
        ModelPackageGroupName=group_name,
        ModelApprovalStatus=status,     # "Approved" | "Rejected" | "PendingManualApproval"
        SortBy="CreationTime",
        SortOrder="Descending",
        MaxResults=20,
    )
    pkgs = resp.get("ModelPackageSummaryList", [])
    if not pkgs:
        return None, None
    return pkgs[0]["ModelPackageArn"], pkgs[0]["CreationTime"]

latest_approved_arn, created = latest_model_package_arn(MPG_NAME, "Approved")
print("Latest Approved ARN:", latest_approved_arn, "| Created:", created)

if latest_approved_arn:
    # NOTE: Describe uses ModelPackageName, even when you pass an ARN.
    info = sm.describe_model_package(ModelPackageName=latest_approved_arn)
    print("Approval:", info["ModelApprovalStatus"])
    print("Container Image:", info["InferenceSpecification"]["Containers"][0]["Image"])
    print("ModelDataUrl:", info["InferenceSpecification"]["Containers"][0]["ModelDataUrl"])
    print("Eval JSON:", info["ModelMetrics"]["ModelQuality"]["Statistics"]["S3Uri"])
else:
    print("No Approved packages yet. Approve one in Step 5 and re-run this cell.")

Expected result: You’ll see the Approved package you just set in Step 5. This ARN is exactly what you’ll feed to deployment (Lab 8) or Pipelines (Lab 10).


Mini‑glossary (plain language)

  • Model Package Group: The long‑lived folder for a model family (all versions live here).
  • Model Package: A specific version of a model with artifacts, metrics, and an approval state.
  • Approval: Manual “go/no‑go” switch for promotion to deployment.
  • Model Metrics: Machine‑readable performance stats (we attached evaluation.json) to support gates and reviews.

Why these steps exist in an MLOps flow

  • They create a traceable, auditable chain from training → evaluation → registration → approval.
  • They support team workflows: data scientists produce candidates; reviewers approve; ops deploy only approved versions.
  • They enable automation later: Pipelines can stop or proceed based on metrics and approval, keeping “human intent” in the loop.

Comprehension Checkpoints

  • Q: What is the difference between model.tar.gz in S3 vs the Model Registry?

    A: S3 just stores bytes; the Registry adds versioning, approval, and metrics.

  • Q: Why attach evaluation.json?

    A: So downstream pipelines can gate deploy based on metrics (e.g., deploy only if PR AUC ≥ 0.4).

  • Q: What does “PendingManualApproval” mean?

    A: Human-in-the-loop review; no accidental auto-deploy.


How This Fits the Workflow

  • Lab 5 → model artifact
  • Lab 6 → evaluation.json (acceptance metrics)
  • Lab 7 → registry entry (governed, versioned, approval)
  • Next:
    • Lab 8 → use this registry entry for Batch inference.
    • Lab 9 → deploy Real-time endpoint.
    • Lab 10 → orchestrate everything in a Pipeline.

Addendum Step: Python File Export

Why this matters (MLOps tie-in)

  • Notebooks are for exploration & teaching — great to step through interactively.
  • Pipelines need Python scripts — they can’t open a .ipynb; they call a .py entrypoint.
  • By exporting our registry logic to sagemaker/code/register.py, we make it reusable in automation. Pipelines (Lab 10) can then invoke it as a step after training & evaluation.

Plain words: notebooks = “learning scratchpad,” Python files = “production building blocks.”


Step A — Create sagemaker/code/register.py

In VS Code, add:

# sagemaker/code/register.py
"""
Register a trained model + evaluation metrics into SageMaker Model Registry.

Assumptions:
  - Env vars set via mlops-env.sh: AWS_REGION, BUCKET, LAB_PREFIX, SM_ROLE_ARN, S3_ARTIFACTS
  - evaluation.json exists at s3://<BUCKET>/artifacts/evaluation/<job_name>/evaluation.json
  - model.tar.gz exists as output of the training job named <job_name>

Creates:
  - A new Model Package version inside the Model Package Group <LAB_PREFIX>-telco-churn
  - Status starts as PendingManualApproval (you can approve later)
"""

import os, json, boto3, botocore
from datetime import datetime
from sagemaker import image_uris

# ---- Env ----
REGION       = os.environ.get("AWS_REGION", "ap-northeast-2")
BUCKET       = os.environ["BUCKET"]
LABP         = os.environ.get("LAB_PREFIX", "student")
SM_ROLE      = os.environ["SM_ROLE_ARN"]
S3_ARTIFACTS = os.environ["S3_ARTIFACTS"]  # e.g., s3://.../artifacts

# ---- Clients ----
boto_sess = boto3.Session(region_name=REGION)
sm = boto_sess.client("sagemaker")
s3 = boto_sess.client("s3")

def latest_evaluation_key():
    """
    Return (job_name, key) for the most recent artifacts/evaluation/<job_name>/evaluation.json.
    Looks under both canonical and LAB_PREFIX-prefixed paths, with pagination.
    """
    prefixes = [
        "artifacts/evaluation/",
        f"{LABP}/artifacts/evaluation/",  # tolerate alternate layout if present
    ]
    candidates = []
    paginator = s3.get_paginator("list_objects_v2")
    for pref in prefixes:
        for page in paginator.paginate(Bucket=BUCKET, Prefix=pref):
            for obj in page.get("Contents", []):
                k = obj["Key"]
                if k.endswith("/evaluation.json"):
                    candidates.append((obj["LastModified"], k))
    if not candidates:
        raise SystemExit("No evaluation.json found under artifacts/evaluation/. Run Lab 6 first.")
    last_mod, key = max(candidates, key=lambda x: x[0])
    job_name = key.rstrip("/").split("/")[-2]
    return job_name, key

def load_json_from_s3(bucket, key):
    obj = s3.get_object(Bucket=bucket, Key=key)
    return json.loads(obj["Body"].read())

def ensure_model_package_group(name: str) -> str:
    """
    Create (if needed) and return the Model Package Group ARN.
    """
    try:
        out = sm.describe_model_package_group(ModelPackageGroupName=name)
        return out["ModelPackageGroupArn"]
    except botocore.exceptions.ClientError as e:
        if e.response["Error"]["Code"] != "ValidationException":
            raise
        out = sm.create_model_package_group(
            ModelPackageGroupName=name,
            ModelPackageGroupDescription="Telco churn model family",
            # Tags belong on the group (NOT individual versions); add here if desired.
            # Tags=[{"Key":"lab","Value":"lab7"},{"Key":"owner","Value":LABP}],
        )
        return out["ModelPackageGroupArn"]

def main():
    # 1) Find newest evaluation.json & parse it
    job_name, eval_key = latest_evaluation_key()
    evaluation = load_json_from_s3(BUCKET, eval_key)

    # 2) Find the training job's model artifact (model.tar.gz)
    desc = sm.describe_training_job(TrainingJobName=job_name)
    model_data_url = desc["ModelArtifacts"]["S3ModelArtifacts"]

    # 3) Pull headline metrics for description
    pr_auc  = float(evaluation["test"]["pr_auc"])
    roc_auc = float(evaluation["test"]["roc_auc"])
    # threshold_star may exist, but isn't required here

    # 4) Resolve framework inference image for region
    sklearn_image = image_uris.retrieve(
        framework="sklearn",
        region=REGION,
        version="1.2-1",
        image_scope="inference",
        py_version="py3",
    )

    # 5) Ensure group exists
    mpg_name = f"{LABP}-telco-churn"
    ensure_model_package_group(mpg_name)

    # 6) Point metrics to the eval JSON written by Lab 6
    eval_json_s3 = f"{S3_ARTIFACTS.rstrip('/')}/evaluation/{job_name}/evaluation.json"

    # 7) Create model package VERSION (no explicit name here)
    create_resp = sm.create_model_package(
        ModelPackageGroupName=mpg_name,
        ModelPackageDescription=(
            f"Telco churn LogReg. ROC AUC={roc_auc:.3f}, PR AUC={pr_auc:.3f}. Job={job_name}"
        ),
        InferenceSpecification={
            "Containers": [{"Image": sklearn_image, "ModelDataUrl": model_data_url}],
            "SupportedContentTypes": ["text/csv", "application/json"],
            "SupportedResponseMIMETypes": ["application/json"],
        },
        ModelApprovalStatus="PendingManualApproval",
        ModelMetrics={
            "ModelQuality": {
                "Statistics": {
                    "ContentType": "application/json",
                    "S3Uri": eval_json_s3,
                }
            }
        },
        # NOTE: Do NOT set Tags here — tags are not supported on versions.
    )

    print("✓ Registered model:", create_resp["ModelPackageArn"])

if __name__ == "__main__":
    main()

Step B — Version Control

From repo root:

git add sagemaker/code/register.py
git commit -m "Lab 7: add register.py for pipelines"
git push

Tip: You may want to use the git add -A && pre-commit run --all-files strategy


Step C — Connect to Later Labs

  • Lab 8 (Real-time Endpoint): Instead of hardcoding artifact S3 paths, you’ll import the latest Approved ARN from the registry.
  • Lab 10 (Pipeline): The pipeline definition will call this script as a step, so registration happens automatically after training + evaluation.

↑ Back to top


Lab 8: Batch Transform

Offliine Scoring


Objective

Score datasets at scale and on demand without running servers. You’ll:

  • Write a small inference script (defines the input/output contract).
  • Create a Model using your latest Approved model from the Registry (Lab 7).
  • Launch a Batch Transform job on a test split in S3 and inspect outputs.
  • Add cost/time tips (row caps, parallelism).
  • Export a Python file for Lab 10 (Pipelines).

Why this Lab (MLOps context)

  • Batch is the safest first deployment: no servers, no idle cost; great for big jobs (overnight scoring, backfills, scheduled runs).
  • It solidifies your inference “contract”: what input you expect (schema) and what output you emit (scores/labels).
  • The same contract will be reused by the real‑time endpoint in Lab 9, so consistency here avoids production drift.
  • In production, teams often separate preprocessing (ETL/feature engineering) from inference
    • Batch inference pipelines usually run nightly/weekly, taking already feature-ready tables (like your f0...fN,label splits).
      • Think: customer features already joined + normalized by the Data Engineering team.
      • The ML pipeline simply loads these feature tables and scores them.

Plain words: Batch is “rent compute resources for a short time to score a pile of rows.” No always‑on endpoint needed.


Prerequisites

  • Lab 7 completed with at least one Approved model package in the group <LAB_PREFIX>-telco-churn.
  • Environment loaded:
source ~/mlops-env.sh
echo $AWS_REGION $BUCKET $LAB_PREFIX $S3_DATA_PROCESSED $S3_ARTIFACTS $SM_ROLE_ARN
  • Python venv active in your DevBox.

Step 1 — Decide the inference contract (what goes in, what comes out)

Concept (what & why)

  • Input (CSV): rows of features with a header. If the label column is present (e.g., Churn), we drop it at inference.
  • Output (CSV): for each input row, we return:
    • proba — the model’s positive‑class probability,
    • pred — the class (0/1) decided at a default threshold (0.5 for now; you can later use threshold_star).

This “contract” must be stable across Batch and Real‑time so downstream consumers don’t break.

Plain words: We agree on “what we feed the model” and “what we get back.”


Step 2 — Create inference.py (the serving logic)

Concept (what & why)

SageMaker’s framework containers (scikit‑learn) look for four optional functions in your script:

  • model_fn(model_dir) — load the trained artifact (and any preprocessors).
  • input_fn(raw_body, content_type) — parse the inbound payload.
  • predict_fn(parsed, model) — run model inference.
  • output_fn(prediction, accept) — serialize output.
In our labs we saved preprocessing and the model as two separate artifacts: preprocess.joblib (Lab 4) and model.joblib (Lab 5). For Batch Transform lab, we score the already processed test split data, so our inference code only needs to load model.joblib and expects the pre-engineered feature columns to be already available. In Lab 9 (real‑time), where the data will not already be pre-processed, we’ll wire in preprocess.joblib so raw requests can be transformed before prediction.

Create a Python file sagemaker/code/inference.py:

This is our custom inference.py, which will contain our custom 4 functions, shown above. The code has been tuned for our Churn dataset and our current “best” model.


 # sagemaker/code/inference.py
"""
Inference entry point for SageMaker (scikit-learn container).
Implements model_fn / input_fn / predict_fn / output_fn.

Contract:
- Input: CSV with header; drops 'Churn' label if present.
- Output: CSV with columns: proba,pred
"""

import os
import io
import json
import tempfile
import joblib
import numpy as np
import pandas as pd
import boto3
from botocore.exceptions import ClientError
from typing import Optional, List, Any
from urllib.parse import urlparse



# --- 1) Load the model from /opt/ml/model ---
def model_fn(model_dir: str):
    model = joblib.load(f"{model_dir}/model.joblib")
    # If training saved a dict, extract the estimator
    if isinstance(model, dict):
        for key in ("model", "estimator", "clf", "sk_model"):
            if key in model:
                model = model[key]
                break
    return model

# --- 2) Parse incoming request ---
# --- input_fn: read CSV, drop label, enforce feature contract ---

# --- imports used by both helpers and input_fn ---
def _read_json_anywhere(path_or_s3uri):
    """Read JSON from local path or s3://bucket/key and return the parsed object."""
    if path_or_s3uri.startswith("s3://"):
        u = urlparse(path_or_s3uri)
        bucket = u.netloc
        key = u.path.lstrip("/")
        s3 = boto3.client("s3")
        obj = s3.get_object(Bucket=bucket, Key=key)
        return json.loads(obj["Body"].read().decode("utf-8"))
    else:
        with open(path_or_s3uri, "r") as f:
            return json.load(f)


def _load_columns():
    """
    Return a list[str] of column names from columns.json.
    Accepts either:
      - {"columns": [...]}  (preferred)
      - [...]               (raw list)
    """
    # priority: explicit env override -> model directory default
    candidates = []
    if os.getenv("FEATURE_LIST_JSON"):
        candidates.append(os.getenv("FEATURE_LIST_JSON"))
    candidates.append("/opt/ml/model/columns.json")

    last_err = None
    for src in candidates:
        try:
            data = _read_json_anywhere(src)
            # normalize shape
            if isinstance(data, dict) and "columns" in data:
                cols = data["columns"]
            elif isinstance(data, list):
                cols = data
            else:
                raise ValueError(f"Unsupported columns.json structure from {src}: {type(data)}")

            # final validation
            if not isinstance(cols, list) or not all(isinstance(c, str) for c in cols):
                raise ValueError(f"columns must be a list[str]; got {type(cols)} from {src}")

            # success
            print(f"[input_fn] Loaded {len(cols)} columns from {src}")
            return cols

        except Exception as e:
            last_err = e
            print(f"[input_fn] WARN: failed to load columns from {src}: {e}")

    raise RuntimeError(f"Could not load columns.json from any source. Last error: {last_err}")


def input_fn(input_data, content_type):
    """
    Parse incoming CSV using schema from columns.json.
    - Drops 'label' if present in schema.
    - Requires header row and all required feature columns.
    - Returns numpy array in the training feature order.
    """
    if content_type != "text/csv":
        raise ValueError(f"Unsupported content type: {content_type}")

    expected = _load_columns()
    feature_cols = [c for c in expected if c != "label"]

    # read CSV (expects header)
    df = pd.read_csv(io.StringIO(input_data))

    # quick visibility (first 5 cols)
    print(f"[input_fn] Incoming header count={len(df.columns)} first5={list(df.columns[:5])}")
    print(f"[input_fn] Expected features count={len(feature_cols)} first5={feature_cols[:5]}")

    # require all features
    missing = [c for c in feature_cols if c not in df.columns]
    if missing:
        raise ValueError(f"Missing required feature columns: {missing}. "
                         "This violates the preprocessing→inference contract.")

    # select in the exact training order; keep any extra columns out
    X = df[feature_cols].to_numpy(dtype="float32")
    print(f"[input_fn] Inference matrix shape: {X.shape} (rows, features)")
    return X

# --- 3) Run prediction ---
# --- threshold discovery (at module import time) ---
# Default

THRESHOLD = 0.5  # fallback

def _parse_s3(uri: str):
    """Split s3://bucket/key into (bucket, key)."""
    if not uri.startswith("s3://"):
        raise ValueError(f"Not an s3 URI: {uri}")
    b, k = uri[5:].split("/", 1)
    return b, k

def _load_eval_from_s3(s3_client, s3_uri: str) -> dict:
    b, k = _parse_s3(s3_uri)
    obj = s3_client.get_object(Bucket=b, Key=k)
    return json.loads(obj["Body"].read())

def _discover_threshold() -> float:
    # 1) explicit override wins
    val = os.environ.get("THRESHOLD")
    if val:
        try:
            return float(val)
        except ValueError:
            print(f"[WARN] THRESHOLD not a float: {val!r}; ignoring")

    # 2) exact evaluation.json S3 location
    eval_uri = os.environ.get("EVAL_JSON_S3")
    if eval_uri:
        try:
            s3 = boto3.client("s3", region_name=os.environ.get("AWS_REGION"))
            evaluation = _load_eval_from_s3(s3, eval_uri)
            return float(evaluation["test"]["threshold_star"])
        except Exception as e:
            print(f"[WARN] Could not read EVAL_JSON_S3 ({eval_uri}): {e}")

    # 3) fallback
    print("[INFO] Using default threshold=0.5 (no THRESHOLD or EVAL_JSON_S3).")
    return 0.5

THRESHOLD = _discover_threshold()
print(f"[INFO] Decision threshold in use: {THRESHOLD:.3f}")

def predict_fn(parsed_input, model):
    if isinstance(model, dict):
        # defensive in case model_fn didn’t unwrap
        for key in ("model", "estimator", "clf", "sk_model"):
            if key in model:
                model = model[key]
                break
    if not hasattr(model, "predict_proba"):
        raise ValueError(f"Loaded model of type {type(model)} lacks predict_proba")
    proba_pos = model.predict_proba(parsed_input)[:, 1]
    pred = (proba_pos >= THRESHOLD).astype(int)
    return np.column_stack([proba_pos, pred])

# --- 4) Serialize output ---
def output_fn(prediction, accept: str):
    """
    Serialize to CSV with header 'proba,pred'.
    """
    if accept in ("text/csv", "application/json"):
        # Convert to CSV
        out = io.StringIO()
        out.write("proba,pred\n")
        # Ensure native types
        for p, y in prediction:
            out.write(f"{float(p)},{int(y)}\n")
        body = out.getvalue()
        return body, "text/csv"
    raise ValueError(f"Unsupported accept: {accept}")

Sanity tip: This script is small on purpose. For production you might add schema validation, feature ordering checks, and threshold passing via env vars.


Step 3 — Resolve model artifact and latest Approved package

In the next few steps, we will interactively run a SageMaker batch transform from Jupyter, so that students can get a better understanding of the process. Create a new notebook in VS Code - batch_transform.ipynb - and run all the codes to Step 7.

Concept (what & why)

We need to fetch the latest Approved package from the SageMaker Model Registry (created in Lab 7). That package contains the frozen combination of:

  • The trained model artifact (model.tar.gz)
  • The container image URI (runtime environment)
  • The evaluation metrics (evaluation.json)

By doing this, we ensure our batch job runs on a governed, approved model rather than some arbitrary checkpoint.

import os, time, uuid, boto3
from pathlib import Path
from sagemaker import Session, image_uris
from sagemaker.sklearn.model import SKLearnModel

# --- Step 1: Env/clients ---
REGION       = os.environ.get("AWS_REGION", "ap-northeast-2")
BUCKET       = os.environ["BUCKET"]
LABP         = os.environ.get("LAB_PREFIX", "student")
SM_ROLE_ARN  = os.environ["SM_ROLE_ARN"]
S3_ARTIFACTS = os.environ["S3_ARTIFACTS"]

boto_sess = boto3.Session(region_name=REGION)
sm = boto_sess.client("sagemaker")
s3 = boto_sess.client("s3")
sagemaker_sess = Session(boto_session=boto_sess)

# --- Step 2: resolve your entrypoint (relative to this notebook) ---
entry_point_path = Path("../sagemaker/code/inference.py").resolve()
assert entry_point_path.exists(), f"Missing: {entry_point_path}"

# --- Step 3: latest Approved package ARN ---
MPG = f"{LABP}-telco-churn"
r = sm.list_model_packages(
    ModelPackageGroupName=MPG,
    ModelApprovalStatus="Approved",
    SortBy="CreationTime", SortOrder="Descending", MaxResults=1
)
assert r.get("ModelPackageSummaryList"), "No Approved package; approve one in Lab 7."
pkg_arn = r["ModelPackageSummaryList"][0]["ModelPackageArn"]

# --- Describe package -> extract components ---
info = sm.describe_model_package(ModelPackageName=pkg_arn)
image_uri      = info["InferenceSpecification"]["Containers"][0]["Image"]
model_data_url = info["InferenceSpecification"]["Containers"][0]["ModelDataUrl"]
eval_json_s3   = info["ModelMetrics"]["ModelQuality"]["Statistics"]["S3Uri"]

print("ENTRY:", entry_point_path)
print("IMAGE:", image_uri)
print("MODEL:", model_data_url)
print("EVAL :", eval_json_s3)

Plain words: We just pulled down the official runtime container, the trained model weights, and the evaluation metrics reference. This ensures our transform uses the same validated package as deployment.


Step 4 — Create a Model bound to our inference script

Concept (what & why)

By default, a Model Package only knows about the container and model weights.

But for batch scoring, we also need our custom inference.py, so that:

  • Input CSV is parsed correctly.
  • Extra/missing columns are handled consistently.
  • Predictions are written in the right format.

The SKLearnModel class takes care of uploading your inference code to S3, wiring environment variables, and creating the SageMaker Model resource.

feature_list_s3 = f"{S3_ARTIFACTS.rstrip('/')}/preprocess/columns.json"

sk_model = SKLearnModel(
    model_data=model_data_url,
    image_uri=image_uri,
    role=SM_ROLE_ARN,
    entry_point=entry_point_path.name,
    source_dir=str(entry_point_path.parent),
    sagemaker_session=sagemaker_sess,
    env={
        "EVAL_JSON_S3": eval_json_s3,
        "FEATURE_LIST_JSON": feature_list_s3
    },
)

Plain words:

This ties together the container + model + your code into a deployable Model object. SageMaker now knows:

“Whenever I need predictions, I’ll spin up this container, load these weights, and run this script.”


Tip: Navigate to AWS ConsoleAmazon SageMaker AIInferenceModels to view the uploaded Model


Step 5 — Choose input data & output location

Concept (what & why)

We’ll locate a recent test.csv from our processed dataset (Labs 3–5). That’s our hold-out evaluation split.

For outputs, we’ll create a timestamped folder under artifacts/batch/ so each run is uniquely identified.

def find_test_input():
    cands = []
    p = s3.get_paginator("list_objects_v2")
    for prefix in ["data/processed/", "artifacts/preprocess/"]:
        for page in p.paginate(Bucket=BUCKET, Prefix=prefix):
            for o in page.get("Contents", []):
                if o["Key"].endswith("test.csv"):
                    cands.append((o["LastModified"], o["Key"]))
    assert cands, "No processed test.csv found."
    _, key = max(cands, key=lambda x: x[0])
    return f"s3://{BUCKET}/{key}"

input_s3 = find_test_input()

from datetime import datetime
output_s3 = f"{S3_ARTIFACTS.rstrip('/')}/batch/{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}/"

print("INPUT:", input_s3)
print("OUT  :", output_s3)

Plain words:

We grab the newest test.csv we can find and set aside a clean output folder. After the job, you’ll find predictions written to test.csv.out.


Step 5 — Choose input data & output location

Concept (what & why)

We’ll locate a recent test.csv from our processed dataset (Labs 3–5). That’s our hold-out evaluation split.

For outputs, we’ll create a timestamped folder under artifacts/batch/ so each run is uniquely identified.

def find_test_input():
    cands = []
    p = s3.get_paginator("list_objects_v2")
    for prefix in ["data/processed/", "artifacts/preprocess/"]:
        for page in p.paginate(Bucket=BUCKET, Prefix=prefix):
            for o in page.get("Contents", []):
                if o["Key"].endswith("test.csv"):
                    cands.append((o["LastModified"], o["Key"]))
    assert cands, "No processed test.csv found."
    _, key = max(cands, key=lambda x: x[0])
    return f"s3://{BUCKET}/{key}"

input_s3 = find_test_input()

from datetime import datetime
output_s3 = f"{S3_ARTIFACTS.rstrip('/')}/batch/{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}/"

print("INPUT:", input_s3)
print("OUT  :", output_s3)

Plain words:

We grab the newest test.csv we can find and set aside a clean output folder. After the job, you’ll find predictions written to test.csv.out.


Step 6 — Launch the Batch Transform job

Concept (what & why)

Batch Transform spins up temporary instances, feeds them your CSV, and saves predictions to S3. You only pay for the run time.

Key knobs:

  • instance_type / instance_count → cost vs speed
  • strategy="MultiRecord" + max_payload / max_concurrent_transforms → batching & parallelism
  • split_type="Line" → CSV rows as records
transformer = sk_model.transformer(
    instance_type="ml.m5.large",
    instance_count=1,
    strategy="MultiRecord",
    assemble_with="Line",
    output_path=output_s3,
    max_payload=6,
    max_concurrent_transforms=2,
)

job_name = f"{LABP}-bt-{uuid.uuid4().hex[:8]}"
transformer.transform(
    data=input_s3,
    content_type="text/csv",
    split_type="Line",
    job_name=job_name
)
print("Started:", job_name)

# --- prove your code is attached ---
d = sm.describe_transform_job(TransformJobName=job_name)
model_name = d.get("ModelName")

env = {}
if model_name:
    m = sm.describe_model(ModelName=model_name)
    env = m.get("PrimaryContainer", {}).get("Environment", {})

print("PROGRAM:", env.get("SAGEMAKER_PROGRAM"))
print("SUBMIT :", env.get("SAGEMAKER_SUBMIT_DIRECTORY"))

# --- Poll status until completion ---
while True:
    d = sm.describe_transform_job(TransformJobName=job_name)
    status = d["TransformJobStatus"]
    print("Status:", status)
    if status in ("Completed", "Failed", "Stopped"):
        if status != "Completed":
            print("FailureReason:", d.get("FailureReason"))
        break
    time.sleep(30)

Expected result:

Status should move → InProgressCompleted.

Then, in S3 you’ll see:

artifacts/batch/<timestamp>/test.csv.out

containing predictions.

Tip: You can also view job status in AWS Console → SageMaker → Batch transform jobs.


Step 7 — Inspect the predictions

Concept (what & why)

We verify the contract: the first row is the header proba,pred, and each following row has two values: the positive-class probability and the predicted class. Because Batch Transform can shard outputs, we’ll look for one or more .out files under the exact S3 prefix we passed as output_path.

# --- Helper: parse an s3://bucket/prefix/ into (bucket, prefix)
from urllib.parse import urlparse

def parse_s3_uri(s3_uri: str):
    u = urlparse(s3_uri)
    assert u.scheme == "s3" and u.netloc, f"Bad S3 URI: {s3_uri}"
    # Ensure trailing slash prefix behavior for list_objects
    prefix = u.path.lstrip("/")
    return u.netloc, prefix

# We set this earlier in Step 6
print("Output (we set this in Step 6):", output_s3)

# --- Find .out objects produced by Batch Transform
out_bucket, out_prefix = parse_s3_uri(output_s3)
paginator = s3.get_paginator("list_objects_v2")
out_keys = []
for page in paginator.paginate(Bucket=out_bucket, Prefix=out_prefix):
    for obj in page.get("Contents", []):
        if obj["Key"].endswith(".out"):
            out_keys.append(obj["Key"])

assert out_keys, f"No .out files found under {output_s3}"
print(f"Found {len(out_keys)} output file(s). Showing the first one:")
print("  s3://%s/%s" % (out_bucket, out_keys[0]))

# --- Read & preview the first ~10 lines
resp = s3.get_object(Bucket=out_bucket, Key=out_keys[0])
lines = resp["Body"].read().decode("utf-8").splitlines()

# Sanity checks on the "contract"
assert lines, "Output file is empty."
header = lines[0].strip()
print("Header:", header)
if header.lower() != "proba,pred":
    print("WARNING: Unexpected header. Did your inference script change?")
    print("First line raw:", header)

print("\nPreview (first 10 lines):")
for row in lines[:10]:
    print(row)

Expected result (shape):

Notes & gotchas

  • Multiple shards: If your input contained multiple files or was large, Batch may write several .out objects. We list them all and preview the first one; students can loop over out_keys to aggregate.
  • Where this lives: Outputs are under the output_s3 prefix we set (e.g., s3://…/artifacts/batch/20250826-055433/). Keeping this under your project bucket (not the SageMaker default) aligns with the Lab 4 “self-contained project” design.
  • Header enforcement: The snippet asserts non-empty output and prints a warning if the header isn’t exactly proba,pred. That’s intentional—if someone edits inference.py later, this check surfaces drift in the output schema early.

Step 8 — Cost/time tips (practical knobs)

  • Row cap for demos: Use only the first N rows (e.g., upload a test_head_10k.csv) for faster runs.
  • Parallelism: Tune max_concurrent_transforms, max_payload, and strategy="MultiRecord" for speed.
  • Instance choice: ml.m5.large is fine for demo; scale up and/or increase instance_count for big workloads.
  • Compression: Gzip inputs/outputs if I/O bound (set accept/content_type accordingly).

Comprehension Checkpoints

  • Q: When do I choose Batch over Real‑time?

    A: When latency isn’t critical and/or you need to score large files economically (no 24/7 endpoint).

  • Q: What’s the “inference contract”?

    A: A stable definition of input & output (schema, formats, fields) that both Batch and Real‑time must follow.

  • Q: Where do costs come from here?

    A: Instance time during the transform; you don’t pay when idle.


How This Fits the Workflow

  • Lab 7 gave us a governed model (with metrics + approval).
  • Lab 8 proves the inference contract and scores a dataset safely and cheaply.
  • Next (Lab 9): deploy a Real‑time endpoint reusing the same script/contract.

From Notebook to Pipeline

(export a Python file + Git)

Why a Python file? Pipelines (Lab 10) don’t run notebooks; they call Python entrypoints. We’ll export minimal logic to scripts/submit_batch_transform.py so a pipeline step can run Batch right after registration/approval.

Step A — Create scripts/submit_batch_transform.py

# scripts/submit_batch_transform.py
"""
Run a SageMaker Batch Transform using the latest Approved model package in <LAB_PREFIX>-telco-churn.
Mirrors the working notebook flow (describe package → SKLearnModel → transformer), but:
  - uploads source_dir.tar.gz to YOUR project bucket via code_location
  - passes FEATURE_LIST_JSON and EVAL_JSON_S3 to inference.py
  - avoids dereferencing fields that can be None (no 'subscriptable' crashes)

Env expected (exported by ~/mlops-env.sh):
  AWS_REGION, LAB_PREFIX, BUCKET, S3_ARTIFACTS, SM_ROLE_ARN
"""

import logging
import os
import uuid
from datetime import datetime

import boto3

from sagemaker import Session
from sagemaker.sklearn.model import SKLearnModel

for name in ("sagemaker", "boto3", "botocore", "urllib3", "s3transfer"):
    logging.getLogger(name).setLevel(logging.WARNING)


# --- Env / clients ---
REGION = os.environ.get("AWS_REGION", "ap-northeast-2")
BUCKET = os.environ["BUCKET"]
LABP = os.environ.get("LAB_PREFIX", "student")
SM_ROLE_ARN = os.environ["SM_ROLE_ARN"]
S3_ARTIFACTS = os.environ["S3_ARTIFACTS"].rstrip("/")  # e.g., s3://<bucket>/<prefix>

boto_sess = boto3.Session(region_name=REGION)
sm = boto_sess.client("sagemaker")
s3 = boto_sess.client("s3")
sess = Session(boto_session=boto_sess)


def latest_approved_pkg_arn(group: str) -> str:
    r = sm.list_model_packages(
        ModelPackageGroupName=group,
        ModelApprovalStatus="Approved",
        SortBy="CreationTime",
        SortOrder="Descending",
        MaxResults=1,
    )
    lst = r.get("ModelPackageSummaryList", [])
    if not lst:
        raise SystemExit("No Approved model package. Approve one in Lab 7.")
    return lst[0]["ModelPackageArn"]


def describe_package_bits(pkg_arn: str):
    info = sm.describe_model_package(ModelPackageName=pkg_arn)
    c = info["InferenceSpecification"]["Containers"][0]
    image_uri = c["Image"]
    model_data_url = c["ModelDataUrl"]
    eval_json_s3 = info["ModelMetrics"]["ModelQuality"]["Statistics"]["S3Uri"]
    return image_uri, model_data_url, eval_json_s3


def find_test_input() -> str:
    cands = []
    p = s3.get_paginator("list_objects_v2")
    for prefix in ("data/processed/", "artifacts/preprocess/"):
        for page in p.paginate(Bucket=BUCKET, Prefix=prefix):
            for o in page.get("Contents", []) or []:
                if o["Key"].endswith("test.csv"):
                    cands.append((o["LastModified"], o["Key"]))
    if not cands:
        raise SystemExit("No processed test.csv found.")
    _, key = max(cands, key=lambda x: x[0])
    return f"s3://{BUCKET}/{key}"


def main():
    group = f"{LABP}-telco-churn"
    pkg_arn = latest_approved_pkg_arn(group)
    image_uri, model_data_url, eval_json_s3 = describe_package_bits(pkg_arn)

    # Contract (columns) published by preprocessing
    feature_list_s3 = f"{S3_ARTIFACTS}/preprocess/columns.json"

    # Build a Model that packages our inference.py; keep code in OUR bucket.
    sk_model = SKLearnModel(
        model_data=model_data_url,
        image_uri=image_uri,
        role=SM_ROLE_ARN,
        entry_point="inference.py",
        source_dir="sagemaker/code",
        sagemaker_session=sess,
        code_location=f"{S3_ARTIFACTS}/code",  # <-- keeps source_dir.tar.gz in project bucket
        env={
            "EVAL_JSON_S3": eval_json_s3,
            "FEATURE_LIST_JSON": feature_list_s3,
        },
    )

    # Transformer (ephemeral batch job)
    out = f"{S3_ARTIFACTS}/batch/{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}/"
    transformer = sk_model.transformer(
        instance_type="ml.m5.large",
        instance_count=1,
        strategy="MultiRecord",
        assemble_with="Line",
        output_path=out,
        max_payload=6,
        max_concurrent_transforms=2,
    )

    # Submit job (no log streaming)
    job_name = f"{LABP}-bt-{uuid.uuid4().hex[:8]}"
    input_s3 = find_test_input()

    # Start the transform but do NOT wait here (avoids streaming container logs)
    transformer.transform(
        data=input_s3,
        content_type="text/csv",
        split_type="Line",
        job_name=job_name,
        wait=False,  # <-- key: don't stream logs
    )

    print(f"Started: {job_name}")
    print(f"Input  : {input_s3}")
    print(f"Output : {out}")  # single source of truth for where predictions will land

    # Quiet poll loop (minimal status line; no container logs)
    import time

    while True:
        d = sm.describe_transform_job(TransformJobName=job_name)
        status = d.get("TransformJobStatus", "Unknown")
        print("Status:", status)
        if status in ("Completed", "Failed", "Stopped"):
            if status != "Completed":
                # print a concise failure reason if present (still quiet)
                fr = d.get("FailureReason")
                if fr:
                    print("FailureReason:", fr)
            # Optionally, confirm the exact S3 output SageMaker returns
            s3_path = (d.get("TransformOutput") or {}).get("S3OutputPath") or out
            print("Final output:", s3_path)
            break
        time.sleep(30)


if __name__ == "__main__":
    main()

Step B — Run it from the terminal and verify results

Use your repo root (where scripts/ and sagemaker/ live).

# 1) Load env once per shell (as in earlier labs)
cd ~/mlops-day2
source ~/mlops-env.sh

# 2) Run the job
python scripts/submit_batch_transform.py

You should see something like:

As before, check the output of the batch inference by examining the Output bucket as shown above as Final output:

copy-paste the Output: value printed by the script into $OUT and then:

OUT='s3://<your-bucket>/<your-prefix>/artifacts/batch/<stamp>/'
aws s3 ls "$OUT"

# stream first 10 lines to verify header and a few rows
aws s3 cp "${OUT}test.csv.out" - | head -n 10

Step C — Version control

git add scripts/submit_batch_transform.py sagemaker/code/inference.py
git commit -m "Lab 8: batch transform (robust code_location, no subscriptable deref)"
git push

Mini-glossary (plain language)

  • Batch Transform: Ephemeral job that reads from S3, writes predictions to S3; you only pay while it runs.
  • Model (SageMaker): A deployable binding of container + model artifact + optional user code.
  • Inference script (inference.py): Implements input_fn/predict_fn/output_fn (our contract).
  • Contract: Stable I/O shape and column alignment rules the runtime honors (we load columns.json, detect/ignore label, drop unknowns, and keep order faithful to training).

Next lab (Lab 9) — Real-time inference (endpoint)

We’ll stand up a real-time endpoint that uses the same contract and the same inference.py, verify health checks (/ping), issue live predictions, and compare latency & ergonomics vs Batch.

↑ Back to top


Lab 9 — Real-Time Inference Endpoint

(Raw → Preprocess → Score)


Learning goals

  • What a SageMaker real-time endpoint is and when to use it vs Batch.
  • How to deploy the latest Approved package with your real-time entrypoint.
  • How to preprocess raw inputs at inference using your saved preprocess.joblib, then score with model.joblib.
  • How to apply the evaluation threshold (from evaluation.json) consistently.
  • How to invoke the endpoint with CSV or JSON and clean up.

Real-time vs Batch (quick)

  • Real-time: sub-second to seconds latency for small requests (APIs, UI callbacks). You pay while the endpoint is running.
  • Batch: large files on a schedule; no server to keep warm (you already did this in Lab 8).

Prereqs

  • Lab 7 produced an Approved package in group <LAB_PREFIX>-telco-churn.
  • Lab 4 created and saved preprocess.joblib at s3://$S3_ARTIFACTS/preprocess/preprocess.joblib.
  • columns.json is at s3://$S3_ARTIFACTS/preprocess/columns.json (kept for compatibility/inspection).
  • evaluations.json is at s3://$S3_ARTIFACTS/evaluation/<timestamp>/evaluation.json
  • Export your env each shell: source ~/mlops-env.sh.

Step 1 — Create sagemaker/code/inference_rt.py (real-time entrypoint)

This is your real-time server. It loads preprocess.joblib from your project bucket, loads model.joblib from the model artifact, pulls the decision threshold from evaluation.json (via EVAL_JSON_S3), accepts raw CSV/JSON, transforms, scores, and returns proba,pred.
# sagemaker/code/inference_rt.py
"""
Real-time inference entry point for SageMaker (scikit-learn container).

This server handles *raw* inputs:
  - Loads preprocess.joblib from S3_ARTIFACTS/preprocess (or local fallback)
  - Loads model.joblib from the model artifact
  - Reads decision threshold from evaluation.json (EVAL_JSON_S3), fallback 0.5
  - Accepts CSV or JSON payloads
  - Returns CSV or JSON with: proba,pred

Environment (set by deploy script):
  EVAL_JSON_S3         s3://.../evaluation.json
  PREPROCESS_JOBLIB_S3 s3://.../preprocess/preprocess.joblib
  FEATURE_LIST_JSON    (optional, kept for parity/logging, not required in RT)

Contract:
  Input: raw CSV with header (or JSON records) containing the training-time raw columns.
  Output: 'proba,pred' (CSV by default) or JSON [{"proba":..., "pred":...}, ...]
"""

import os
import io
import json
import tarfile
import tempfile
import logging
from typing import Any, Dict, Tuple, List

import boto3
import joblib
import numpy as np
import pandas as pd

# Reduce noisy container/server logs (keep them readable in CloudWatch)
logging.getLogger("werkzeug").setLevel(logging.WARNING)
LOG = logging.getLogger("inference_rt")
LOG.setLevel(logging.INFO)

# ---- small S3 helpers ----
def _is_s3(uri: str) -> bool:
    return isinstance(uri, str) and uri.startswith("s3://")

def _split_s3(uri: str) -> Tuple[str, str]:
    if not _is_s3(uri):
        raise ValueError(f"Not an s3 URI: {uri}")
    b, k = uri[5:].split("/", 1)
    return b, k

def _read_s3_json(s3_uri: str, sess=None) -> dict:
    if sess is None:
        sess = boto3.Session()
    s3 = sess.client("s3")
    b, k = _split_s3(s3_uri)
    obj = s3.get_object(Bucket=b, Key=k)
    return json.loads(obj["Body"].read().decode("utf-8"))

def _download_s3_to_path(s3_uri: str, dst_path: str, sess=None) -> str:
    if sess is None:
        sess = boto3.Session()
    s3 = sess.client("s3")
    b, k = _split_s3(s3_uri)
    os.makedirs(os.path.dirname(dst_path), exist_ok=True)
    s3.download_file(b, k, dst_path)
    return dst_path

# ---- threshold loader (robust to several common metric key names) ----
def _load_threshold() -> float:
    """
    Try to read the decision threshold from evaluation.json.
    We accept multiple common key names to be resilient to how the eval was written.
    Fallback to 0.5 if nothing sensible is found.
    """
    uri = os.environ.get("EVAL_JSON_S3", "")
    if not _is_s3(uri):
        LOG.info("[INFO] No EVAL_JSON_S3 set, defaulting threshold=0.5")
        return 0.5

    try:
        j = _read_s3_json(uri)
    except Exception as e:
        LOG.info(f"[INFO] Failed to read evaluation JSON ({uri}): {e}. Using threshold=0.5")
        return 0.5

    # Candidates in rough priority order:
    candidates = [
        ("metrics", "decision_threshold"),
        ("decision_threshold",),
        ("best_f1_threshold",),
        ("best_threshold",),
        ("threshold",),
        ("binary_classification_metrics", "decision_threshold"),
    ]
    for path in candidates:
        node = j
        ok = True
        for k in path:
            if isinstance(node, dict) and k in node:
                node = node[k]
            else:
                ok = False
                break
        if ok and isinstance(node, (int, float)):
            return float(node)

    LOG.info("[INFO] No threshold found in eval JSON. Using threshold=0.5")
    return 0.5

# ---- model loader (SageMaker calls this once per container) ----
def model_fn(model_dir: str) -> Dict[str, Any]:
    """
    Load model + preprocess + threshold. SageMaker passes /opt/ml/model here.
    - model.joblib must be inside model_dir (from model.tar.gz).
    - preprocess.joblib is pulled from PREPROCESS_JOBLIB_S3 (project bucket),
      or fallback to model_dir/preprocess.joblib if present.
    """
    # Load model
    model_path = os.path.join(model_dir, "model.joblib")
    if not os.path.exists(model_path):
        # Some training jobs package as model.pkl
        alt = os.path.join(model_dir, "model.pkl")
        model_path = alt if os.path.exists(alt) else os.path.join(model_dir, "model.joblib")
    model = joblib.load(model_path)

    # Load preprocess
    preprocess = None
    sess = boto3.Session()
    pre_s3 = os.environ.get("PREPROCESS_JOBLIB_S3", "")
    try:
        if _is_s3(pre_s3):
            local_pre = os.path.join(model_dir, "preprocess.joblib")
            _download_s3_to_path(pre_s3, local_pre, sess=sess)
            preprocess = joblib.load(local_pre)
        else:
            local_pre = os.path.join(model_dir, "preprocess.joblib")
            if os.path.exists(local_pre):
                preprocess = joblib.load(local_pre)
    except Exception as e:
        LOG.info(f"[INFO] Could not load preprocess.joblib ({pre_s3}): {e}")

    if preprocess is None:
        raise RuntimeError(
            "preprocess.joblib not found. Set PREPROCESS_JOBLIB_S3 to s3://.../preprocess/preprocess.joblib"
        )

    # Load threshold
    threshold = float(_load_threshold())
    LOG.info(f"[INFO] Decision threshold in use: {threshold:.3f}")

    return {"model": model, "preprocess": preprocess, "threshold": threshold}

# ---- input parsing ----
def input_fn(request_body: bytes, request_content_type: str) -> pd.DataFrame:
    """
    Accept raw CSV or JSON and return a pandas DataFrame with raw feature columns.
    - CSV: must include a header row with raw column names. If a label is present, we drop it.
    - JSON: accept either a list of records or {"instances": [...]} where each item is a mapping.
    """
    ct = (request_content_type or "").lower().strip()

    if ct.startswith("text/csv"):
        text = request_body.decode("utf-8")
        df = pd.read_csv(io.StringIO(text))
        # Drop common label columns if present
        for lab in ("Churn", "label", "target", "y"):
            if lab in df.columns:
                df = df.drop(columns=[lab])
        if df.empty:
            raise ValueError("Parsed CSV had no usable columns/rows.")
        return df

    if ct.startswith("application/json"):
        j = json.loads(request_body.decode("utf-8"))
        records = j.get("instances", j)  # allow list or {"instances":[...]}
        if not isinstance(records, list):
            raise ValueError("JSON body must be a list of records or {'instances': [...]}")

        # Each record can be a dict (preferred) or a list with matching order to training raw columns
        if all(isinstance(r, dict) for r in records):
            df = pd.DataFrame.from_records(records)
        else:
            # If list of lists, try to coerce by using keys from the first dict we might have seen
            raise ValueError("For JSON, each instance should be an object (dict) of raw feature_name: value.")
        # Drop common label names if present
        for lab in ("Churn", "label", "target", "y"):
            if lab in df.columns:
                df = df.drop(columns=[lab])
        if df.empty:
            raise ValueError("Parsed JSON had no usable columns/rows.")
        return df

    raise ValueError(f"Unsupported content type: {request_content_type}. Use text/csv or application/json.")

# ---- predict ----
def predict_fn(raw_df: pd.DataFrame, bundle: Dict[str, Any]):
    """
    raw_df -> preprocess.transform -> model.predict_proba -> (proba, pred>=threshold)
    """
    preprocess = bundle["preprocess"]
    model = bundle["model"]
    thr = float(bundle["threshold"])

    # Transform raw features to the numeric matrix the model expects
    X = preprocess.transform(raw_df)  # should return a 2D array/matrix

    # Predict probabilities; take positive class (index 1 is common for binary)
    proba = model.predict_proba(X)[:, 1]
    pred = (proba >= thr).astype(int)

    return np.c_[proba, pred]

# ---- output formatting ----
def output_fn(prediction, accept: str):
    """
    Return CSV ('proba,pred') by default. Support JSON on request.
    """
    accept = (accept or "text/csv").lower().strip()

    if accept.startswith("application/json"):
        out = [{"proba": float(p[0]), "pred": int(p[1])} for p in prediction]
        body = json.dumps(out)
        return body, "application/json"

    # CSV default
    buf = io.StringIO()
    buf.write("proba,pred\n")
    for p in prediction:
        buf.write(f"{float(p[0]):.6f},{int(p[1])}\n")
    return buf.getvalue(), "text/csv"


Step 2 — Resolve latest Approved package

Use a Jupyter Notebook to follow similar pattern as your prior labs: first we discover the latest Approved package and pull image + model artifact + evaluation.json (for the threshold). We’ll reuse these values in the submit script.

  • Create a new Jupyter Notebook - test_realtime.ipynb
# Lab 9 / Step 2 – Discover the latest Approved package and core URIs

import os, boto3
from sagemaker import Session

REGION       = os.environ.get("AWS_REGION", "ap-northeast-2")
BUCKET       = os.environ["BUCKET"]
LABP         = os.environ.get("LAB_PREFIX", "student")
SM_ROLE_ARN  = os.environ["SM_ROLE_ARN"]
S3_ARTIFACTS = os.environ["S3_ARTIFACTS"].rstrip("/")

boto_sess = boto3.Session(region_name=REGION)
sm = boto_sess.client("sagemaker")
sess = Session(boto_session=boto_sess)

def latest_approved_pkg_arn(group: str) -> str:
    r = sm.list_model_packages(
        ModelPackageGroupName=group,
        ModelApprovalStatus="Approved",
        SortBy="CreationTime", SortOrder="Descending", MaxResults=1
    )
    items = r.get("ModelPackageSummaryList", [])
    if not items:
        raise SystemExit("No Approved model package. Approve one in Lab 7.")
    return items[0]["ModelPackageArn"]

def describe_package_bits(pkg_arn: str):
    info = sm.describe_model_package(ModelPackageName=pkg_arn)
    c = info["InferenceSpecification"]["Containers"][0]
    return c["Image"], c["ModelDataUrl"], info["ModelMetrics"]["ModelQuality"]["Statistics"]["S3Uri"]

GROUP = f"{LABP}-telco-churn"
PKG_ARN = latest_approved_pkg_arn(GROUP)
IMAGE_URI, MODEL_DATA_URL, EVAL_JSON_S3 = describe_package_bits(PKG_ARN)

print("Image:", IMAGE_URI)
print("Model:", MODEL_DATA_URL)
print("Eval :", EVAL_JSON_S3)
print("Pre  :", f"{S3_ARTIFACTS}/preprocess/preprocess.joblib")

Step 3 — Deploy a real-time endpoint (Jupyter cell)

We attach inference_rt.py and pass three env values:

  • PREPROCESS_JOBLIB_S3s3://…/preprocess/preprocess.joblib
  • EVAL_JSON_S3 → evaluation metrics
  • FEATURE_LIST_JSON → optional parity with earlier labs (not required for RT path)
# Lab 9 / Step 3 – Deploy/update the endpoint from the notebook

from sagemaker.sklearn.model import SKLearnModel
import logging
logging.getLogger("sagemaker").setLevel(logging.WARNING)

ENDPOINT_NAME = f"{LABP}-rt-endpoint"

sk_rt = SKLearnModel(
    model_data=MODEL_DATA_URL,
    image_uri=IMAGE_URI,
    role=SM_ROLE_ARN,
    entry_point="inference_rt.py",
    source_dir="/home/ec2-user/mlops-day2/sagemaker/code",
    sagemaker_session=sess,
    code_location=f"{S3_ARTIFACTS}/code",
    env={
        "PREPROCESS_JOBLIB_S3": f"{S3_ARTIFACTS}/preprocess/preprocess.joblib",
        "EVAL_JSON_S3": EVAL_JSON_S3,
        "FEATURE_LIST_JSON": f"{S3_ARTIFACTS}/preprocess/columns.json",  # optional
    },
)

predictor = sk_rt.deploy(
    endpoint_name=ENDPOINT_NAME,
    instance_type="ml.m5.large",
    initial_instance_count=1,
    update_endpoint=True,   # update if it exists
    wait=True,              # block until InService
)

print("✓ Endpoint InService:", ENDPOINT_NAME)

Tip: View the creation of the endpoint from AWS Console > SageMaker AI > Inference > Endpoints


Step 4 — Smoke test (CSV or JSON) (Jupyter cell)

We’ll grab a tiny sample (5 rows) from your raw telco CSV (if you keep one), or fall back to your processed test split (the preprocess can usually handle both, but raw is the point here).

# Lab 9 / Step 4 – Invoke with CSV or JSON (choose one) from the notebook

import pandas as pd
from io import StringIO
rt = boto_sess.client("sagemaker-runtime")

def s3_get_text(bucket, key):
    return s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8")

# --- CSV path (header required) ---
# Example: try to find a small raw file you staged (adjust as needed)
csv_key = None
for pfx in ["data/raw/", "data/processed/", "artifacts/preprocess/"]:
    pages = s3.get_paginator("list_objects_v2").paginate(Bucket=BUCKET, Prefix=pfx)
    for page in pages:
        for o in page.get("Contents", []):
            if o["Key"].endswith(".csv"):
                csv_key = o["Key"]
                break
        if csv_key: break
    if csv_key: break

if not csv_key:
    raise SystemExit("No CSV found under data/raw/ or data/processed/.")

text = s3_get_text(BUCKET, csv_key)
df = pd.read_csv(StringIO(text)).head(5)
csv_payload = df.to_csv(index=False)

resp = rt.invoke_endpoint(
    EndpointName=ENDPOINT_NAME,
    ContentType="text/csv",
    Accept="text/csv",
    Body=csv_payload.encode("utf-8"),
)
print(resp["Body"].read().decode("utf-8"))

# --- JSON path (alternative) ---
# records = df.to_dict(orient="records")
# resp = rt.invoke_endpoint(
#     EndpointName=ENDPOINT_NAME,
#     ContentType="application/json",
#     Accept="application/json",
#     Body=json.dumps({"instances": records}).encode("utf-8"),
# )
# print(resp["Body"].read().decode("utf-8"))

Why not a browser call? Endpoints require SigV4 signed requests (AWS credentials). Use Boto3 (above) or AWS CLI (below). A bare curl won’t work unless you implement SigV4 signing.

CLI example (CSV):

aws sagemaker-runtime invoke-endpoint \
  --endpoint-name "$LAB_PREFIX-rt-endpoint" \
  --content-type text/csv --accept text/csv \
  --body fileb://sample.csv out.csv

head out.csv

Step 5 — Cleanup (stop billing) (Jupyter cell)

# Lab 9 / Step 5 – Delete endpoint when done
from botocore.exceptions import ClientError

try:
    sm.delete_endpoint(EndpointName=ENDPOINT_NAME)
    sm.get_waiter("endpoint_deleted").wait(EndpointName=ENDPOINT_NAME)
    print("✓ Deleted:", ENDPOINT_NAME)
except ClientError as e:
    print("Delete skipped/failed:", e)

Scripts (pipeline-friendly pattern)

Same as your other labs: put runnable helpers in scripts/. These call the same logic you explored in the notebook.

A) scripts/submit_realtime_endpoint.py (create/update)

# scripts/submit_realtime_endpoint.py
"""
Deploy or update a SageMaker real-time endpoint using the latest Approved model package.
Attaches sagemaker/code/inference_rt.py and uses your *project* bucket for code tarballs.

Env: AWS_REGION, LAB_PREFIX, BUCKET, S3_ARTIFACTS, SM_ROLE_ARN
     (optional) SM_RT_INSTANCE_TYPE, SM_RT_INSTANCE_COUNT
"""

import os, boto3, logging
from sagemaker import Session
from sagemaker.sklearn.model import SKLearnModel

REGION       = os.environ.get("AWS_REGION", "ap-northeast-2")
BUCKET       = os.environ["BUCKET"]
LABP         = os.environ.get("LAB_PREFIX", "student")
SM_ROLE_ARN  = os.environ["SM_ROLE_ARN"]
S3_ARTIFACTS = os.environ["S3_ARTIFACTS"].rstrip("/")

ENDPOINT_NAME  = os.environ.get("SM_RT_ENDPOINT_NAME", f"{LABP}-rt-endpoint")
INSTANCE_TYPE  = os.environ.get("SM_RT_INSTANCE_TYPE", "ml.m5.large")
INSTANCE_COUNT = int(os.environ.get("SM_RT_INSTANCE_COUNT", "1"))

logging.getLogger("sagemaker").setLevel(logging.WARNING)
logging.getLogger("botocore").setLevel(logging.WARNING)

boto_sess = boto3.Session(region_name=REGION)
sm = boto_sess.client("sagemaker")
sess = Session(boto_session=boto_sess)

def latest_approved_pkg_arn(group: str) -> str:
    r = sm.list_model_packages(
        ModelPackageGroupName=group,
        ModelApprovalStatus="Approved",
        SortBy="CreationTime", SortOrder="Descending", MaxResults=1
    )
    items = r.get("ModelPackageSummaryList", [])
    if not items:
        raise SystemExit("No Approved model package. Approve one in Lab 7.")
    return items[0]["ModelPackageArn"]

def describe_package_bits(pkg_arn: str):
    info = sm.describe_model_package(ModelPackageName=pkg_arn)
    c = info["InferenceSpecification"]["Containers"][0]
    return c["Image"], c["ModelDataUrl"], info["ModelMetrics"]["ModelQuality"]["Statistics"]["S3Uri"]

def main():
    group = f"{LABP}-telco-churn"
    pkg_arn = latest_approved_pkg_arn(group)
    image_uri, model_data_url, eval_json_s3 = describe_package_bits(pkg_arn)

    sk_rt = SKLearnModel(
        model_data=model_data_url,
        image_uri=image_uri,
        role=SM_ROLE_ARN,
        entry_point="inference_rt.py",
        source_dir="sagemaker/code",
        sagemaker_session=sess,
        code_location=f"{S3_ARTIFACTS}/code",
        env={
            "PREPROCESS_JOBLIB_S3": f"{S3_ARTIFACTS}/preprocess/preprocess.joblib",
            "EVAL_JSON_S3": eval_json_s3,
            "FEATURE_LIST_JSON": f"{S3_ARTIFACTS}/preprocess/columns.json",
        },
    )

    sk_rt.deploy(
        endpoint_name=ENDPOINT_NAME,
        instance_type=INSTANCE_TYPE,
        initial_instance_count=INSTANCE_COUNT,
        update_endpoint=True,
        wait=True,
    )

    print("Endpoint:", ENDPOINT_NAME)
    print("InstanceType:", INSTANCE_TYPE)
    print("InstanceCount:", INSTANCE_COUNT)
    print("Status: InService")

if __name__ == "__main__":
    main()

B) scripts/invoke_realtime_smoke.py (CSV/JSON via -format)

# scripts/invoke_realtime_smoke.py
"""
Invoke the real-time endpoint with a tiny payload.
Usage:
  python scripts/invoke_realtime_smoke.py --format csv
  python scripts/invoke_realtime_smoke.py --format json

Env: AWS_REGION, BUCKET, LAB_PREFIX (optional), SM_RT_ENDPOINT_NAME (optional)
"""
import os, argparse, json, boto3
from io import StringIO
import pandas as pd

REGION   = os.environ.get("AWS_REGION", "ap-northeast-2")
BUCKET   = os.environ["BUCKET"]
LABP     = os.environ.get("LAB_PREFIX", "student")
ENDPOINT = os.environ.get("SM_RT_ENDPOINT_NAME", f"{LABP}-rt-endpoint")

boto_sess = boto3.Session(region_name=REGION)
s3 = boto_sess.client("s3")
rt = boto_sess.client("sagemaker-runtime")

def find_any_csv(bucket: str) -> str:
    for prefix in ["data/raw/", "data/processed/", "artifacts/preprocess/"]:
        for page in s3.get_paginator("list_objects_v2").paginate(Bucket=bucket, Prefix=prefix):
            for o in page.get("Contents", []):
                if o["Key"].endswith(".csv"):
                    return o["Key"]
    raise SystemExit("No CSV found under data/raw/ or data/processed/.")

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--format", choices=["csv", "json"], default="csv",
                    help="Payload format to send.")
    args = ap.parse_args()

    key = find_any_csv(BUCKET)
    body = s3.get_object(Bucket=BUCKET, Key=key)["Body"].read().decode("utf-8")
    df = pd.read_csv(StringIO(body)).head(5)

    if args.format == "json":
        payload = json.dumps({"instances": df.to_dict(orient="records")}).encode("utf-8")
        ct, accept = "application/json", "application/json"
    else:
        payload = df.to_csv(index=False).encode("utf-8")
        ct, accept = "text/csv", "text/csv"

    resp = rt.invoke_endpoint(
        EndpointName=ENDPOINT,
        ContentType=ct,
        Accept=accept,
        Body=payload,
    )
    print(resp["Body"].read().decode("utf-8"))

if __name__ == "__main__":
    main()

C) scripts/delete_realtime_endpoint.py (stop billing)

# scripts/delete_realtime_endpoint.py
"""
Delete the named endpoint.
Env: AWS_REGION, LAB_PREFIX (optional), SM_RT_ENDPOINT_NAME (optional)
"""
import os, boto3
from botocore.exceptions import ClientError

REGION = os.environ.get("AWS_REGION", "ap-northeast-2")
LABP   = os.environ.get("LAB_PREFIX", "student")
NAME   = os.environ.get("SM_RT_ENDPOINT_NAME", f"{LABP}-rt-endpoint")

boto_sess = boto3.Session(region_name=REGION)
sm = boto_sess.client("sagemaker")

def main():
    try:
        sm.delete_endpoint(EndpointName=NAME)
        sm.get_waiter("endpoint_deleted").wait(EndpointName=NAME)
        print("Deleted:", NAME)
    except ClientError as e:
        print("Skip/Failed:", e)

if __name__ == "__main__":
    main()


Run from terminal (your standard pattern)

# 0) shared env
source ~/mlops-env.sh

# 1) deploy/update the endpoint
python scripts/submit_realtime_endpoint.py

# 2) smoke test (CSV)
python scripts/invoke_realtime_smoke.py --format csv

# (or) smoke test (JSON)
python scripts/invoke_realtime_smoke.py --format json

# 3) stop billing when done
python scripts/delete_realtime_endpoint.py


Version control

git add sagemaker/code/inference_rt.py \
        scripts/submit_realtime_endpoint.py \
        scripts/invoke_realtime_smoke.py \
        scripts/delete_realtime_endpoint.py
git commit -m "Lab 9: real-time endpoint with raw→preprocess→score, threshold from eval.json"
git push

Mini-glossary

  • Endpoint (SageMaker): Managed HTTPS API for inference; requires AWS SigV4; billed while running.
  • Model (SageMaker): Container image + model artifact + (optional) your code.
  • EndpointConfig: Deployment recipe (instance type/count, variants).
  • Preprocess-at-inference: Load your fitted feature pipeline and apply it to raw inputs before scoring; reduces train/serve skew.
  • Decision threshold: Converts probability to class; sourced from evaluation time for consistent behavior.

Why this design

  • It mirrors real teams: batch for backfills/schedules (Lab 8/10), real-time for low-latency calls (Lab 9).
  • The server enforces consistent thresholding using evaluation.json.
  • Keeping preprocess.joblib in your project bucket makes it easy to rotate/improve preprocessing independently of the model packaging format in training.

Next Lab

Lab 10 — Pipelines: we’ll wire registration → approval → (batch and/or real-time deploy) into a single CI/CD flow. We’ll also document the two robustness patterns you’ve been discussing:

  • Pattern A: “Preprocess-in-container” (preferred for resilience to drift).
  • Pattern B: Strict upstream preprocessing (pushes a clean, contracted dataset to training/inference).

Both will rely on the same contract artifacts (columns.json, schema.json) you’ve been curating.

↑ Back to top


Lab 10 — SageMaker Pipeline

Batch Transform with Schema Guard


What you’ll build (and why)

We’ll create a SageMaker Pipeline that runs a Batch Transform of your latest Approved model against a CSV in S3. Before we spend money on compute, we run a schema smoke test that compares the incoming CSV header to your contract (the columns.json produced in preprocessing). That gives us resilience against data-source drift:

  • Extra columnsallowed (we drop them in input_fn, just as in Lab 8).
  • Missing required columnshard fail (catch early; don’t burn transform time).
  • Order differences → harmless (we re-order based on columns.json).

Production Patterns

👉 In AWS SageMaker Pipelines, you can chain Processing steps (for preprocessing) + Transform steps (for batch inference). This is done when you own preprocessing inside ML.

👉 But many companies push preprocessing upstream to a Feature Store (or a Data Platform), so the ML pipeline only sees pre-curated features. This allows clear separation of Data Engineering from Data Science/Machine Learning. Further, companies are often able to reduce costs. That’s why our Lab 8 pipeline assumes columns.json format is available and models the feature-store pattern.

👉 In this lab, we assume feature-store pattern has been used. However, even within this pattern, a robust system may not assume that the presented data for scoring is fail-safe and bug free. In dealing with situation, there are two pattern.

Two production patterns (context)

  • Pattern A – Preprocess-in-container (preferred for robustness)

    Your inference script enforces/aligns the header to the contract at runtime. We’re already doing this (Lab 8 code).

  • Pattern B – Offline preprocess then score

    Pipelines generate a cleaned test split and pass it forward. Simple but brittle if upstream drifts.

In this lab we still use Pattern A at inference time, but we add a schema guard step to fail fast when required columns are missing.

Why do we have Lab 8 and Lab 10?

We could put most guard logic in inference.py and still run Batch. The reason we present both Lab 8 and Lab 10 is to teach how/where that logic runs and what you get from an orchestrated, versioned pipeline.

In Lab 8, we learn how to create a batch scoring engine

  • Mechanics + debugging: How to wire a container, model artifact, and Batch Transform end-to-end, fast. Perfect for proving the contract and iterating on input_fn/output_fn.
  • Operator control: You directly pick inputs and kick off the job from a notebook. Great for “one-off” scoring, smoke testing, and rapid feedback loops.
  • Lower ceremony: No pipeline scaffolding, approvals, or parameter plumbing to slow you down while you’re still shaping the contract.

In Lab 10, we learn about creating SageMaker Pipelines to sequence tasks

  • Orchestration & reproducibility: Every run is an immutable, parameterized DAG with versioned inputs/outputs. Anyone can re-run “the same thing” later.
  • Policy before compute (why guards live in the pipeline):
    • The schema guard runs before provisioning transform instances. If input is malformed, you fail early and cheaply—instead of paying for instances to boot and then fail inside inference.py.
    • The guard is centralized & auditable. Rather than burying rules in arbitrary user code, it’s a first-class step with its own logs, pass/fail signal, and lineage. Compliance teams love this.
    • Consistent enforcement across models/teams: the guard step can be reused as a standard component.
  • Conditional control flow (ConditionStep): cleanly route to:
    • Transform if guard passes,
    • Stop/notify if it fails (e.g., post a Slack/SNS with the reason).
  • Parameterization: swap datasets, instance types, thresholds, and destinations via Parameters—no code edits.
  • Lineage & traceability: each step contributes artifacts to SageMaker lineage—who ran what, with which data, and where outputs landed.
  • Caching: unchanged steps (e.g., guard on an identical manifest) can be skipped to save time/money.
  • Automation hooks: CI/CD can trigger pipelines on approvals, new data, or schedule; results can notify downstream systems.
  • Separation of concerns:
    • inference.py focuses on robust parsing & model execution.
    • The pipeline handles pre-entry validation, orchestration, and fail-fast policy.

Files we’ll add

  • sagemaker/code/schema_guard.py – tiny Processing script (our smoke test)
  • scripts/submit_pipeline_batch.py – defines, upserts, and runs the pipeline

Both use the same env convention as your other labs and keep all artifacts in your project bucket (not the default SM bucket).


Step 1 — Schema Guard (smoke test) script

Create sagemaker/code/schema_guard.py:

# sagemaker/code/schema_guard.py
"""
Schema guard "smoke test" for batch input.

This script is meant to run inside a SageMaker **ProcessingStep** as part of your pipeline.
It checks whether the incoming CSV data matches the schema contract defined in `columns.json`.

- Confirms all **required feature columns** exist (ignores 'label' because we don't need it at inference).
- Reports **extra columns** and whether the column **order** matches.
- Exits with a non-zero code if required columns are missing (this fails the pipeline fast).
- Respects `--allow-extras`: if set to true, extra columns are tolerated (but still reported).

Inputs (via CLI arguments preferred; environment variable fallbacks kept for backward compatibility):
  --s3-input s3://bucket/path/to/input.csv
  --contract-json s3://bucket/path/to/columns.json
  --allow-extras true|false

Outputs:
  A JSON report `/opt/ml/processing/output/schema_check.json`
  This is picked up by the pipeline via a PropertyFile.
"""

import argparse
import json
import os
import sys
from typing import List, Tuple
import boto3


# --------------------------------------------------------------------
# Helper functions for working with S3
# --------------------------------------------------------------------

def _is_s3(uri: str) -> bool:
    """Quick check: does this string look like an S3 URI?"""
    return uri.startswith("s3://")


def _split_s3(uri: str) -> Tuple[str, str]:
    """Split s3://bucket/key into (bucket, key)."""
    if not _is_s3(uri):
        raise ValueError(f"Not an s3 URI: {uri}")
    b, k = uri[5:].split("/", 1)
    return b, k


def _read_s3_text_first_line(s3_uri: str, session=None) -> str:
    """
    Read just the **first line** of an S3 object as text (utf-8).
    Used to peek at the CSV header row without downloading the entire file.
    """
    if session is None:
        session = boto3.Session()
    s3 = session.client("s3")
    bucket, key = _split_s3(s3_uri)
    # Request only the first ~1 MB (enough for a header line)
    obj = s3.get_object(Bucket=bucket, Key=key, Range="bytes=0-1048576")
    body = obj["Body"].read().decode("utf-8", errors="replace")
    return body.splitlines()[0] if body else ""


def _read_s3_json(s3_uri: str, session=None) -> dict:
    """
    Read and parse a JSON file from S3 into a Python dict.
    Used for loading columns.json.
    """
    if session is None:
        session = boto3.Session()
    s3 = session.client("s3")
    bucket, key = _split_s3(s3_uri)
    obj = s3.get_object(Bucket=bucket, Key=key)
    txt = obj["Body"].read().decode("utf-8")
    return json.loads(txt)


# --------------------------------------------------------------------
# Core schema validation logic
# --------------------------------------------------------------------

def _normalize_columns(obj: dict) -> List[str]:
    """
    Normalize the contract file into a list of column names.

    Expected shapes:
      {"columns": ["f0","f1",...,"label"]}
    or simply:
      ["f0","f1",...,"label"]

    Always returns a list of strings.
    """
    if isinstance(obj, dict) and "columns" in obj and isinstance(obj["columns"], list):
        return [str(c) for c in obj["columns"]]
    if isinstance(obj, list):
        return [str(c) for c in obj]
    raise ValueError("columns.json must be either {'columns': [...]} or a JSON list of column names")


def _detect_header(first_line: str, expected: List[str]) -> Tuple[bool, List[str]]:
    """
    Heuristically decide if the first line looks like a header.

    Rules:
      - If ≥50% of tokens overlap with expected names, treat as header.
      - Or if it contains 'label' or any 'f<digit>' tokens, treat as header.
    Returns:
      (has_header: bool, tokens: list[str])
    """
    if not first_line:
        return False, []
    toks = [t.strip() for t in first_line.strip().split(",")]
    if not toks:
        return False, []

    exp_set = set(expected)
    overlap = len([t for t in toks if t in exp_set])

    if overlap >= max(1, int(0.5 * len(exp_set))):
        return True, toks
    if any(t == "label" or (t.startswith("f") and t[1:].isdigit()) for t in toks):
        return True, toks
    return False, toks


def _analyze(header_cols: List[str], contract_cols: List[str], allow_extras: bool) -> dict:
    """
    Compare the header against the contract.

    - Required columns = all contract columns except 'label'.
    - If any required column is missing → error.
    - If extras found and not allowed → error.
    - Otherwise, return status "ok" (but still report extras).
    """
    required = [c for c in contract_cols if c != "label"]
    required_set = set(required)
    header_set = set(header_cols)

    missing = sorted(list(required_set - header_set))
    extras = sorted(list(header_set - required_set))

    # Check if order matches (only across required cols, label ignored)
    required_in_header = [c for c in header_cols if c in required_set]
    order_matches = required_in_header == required

    status = "ok"
    error = None
    if missing:
        status = "error"
        error = f"Missing required columns: {missing}"
    elif extras and not allow_extras:
        status = "error"
        error = f"Extra columns present (and not allowed): {extras}"

    return {
        "status": status,
        "error": error,
        "required_count": len(required),
        "header_count": len(header_cols),
        "missing_required": missing,
        "extra_columns": extras,
        "order_matches": order_matches,
        "checked_against": required,  # required list (label excluded)
    }


# --------------------------------------------------------------------
# Main entrypoint
# --------------------------------------------------------------------

def main():
    # ----------------- Parse arguments -----------------
    parser = argparse.ArgumentParser(description="Schema guard for batch CSV input")
    parser.add_argument("--s3-input", dest="s3_input", default=os.environ.get("S3_INPUT"))
    parser.add_argument("--contract-json", dest="contract_json", default=os.environ.get("CONTRACT_JSON"))
    parser.add_argument(
        "--allow-extras",
        dest="allow_extras",
        default=os.environ.get("ALLOW_EXTRAS", "true"),
        choices=["true", "false", "True", "False", "1", "0"],
    )
    args = parser.parse_args()

    # Safety: required args must be provided
    if not args.s3_input or not args.contract_json:
        print("ERROR: --s3-input and --contract-json are required (or set env vars).", file=sys.stderr)
        sys.exit(2)

    allow_extras = str(args.allow_extras).strip().lower() in ("true", "1", "yes")

    # ----------------- Load contract -----------------
    session = boto3.Session()
    try:
        contract_raw = _read_s3_json(args.contract_json, session=session)
        contract_cols = _normalize_columns(contract_raw)
    except Exception as e:
        print(f"ERROR: Failed to read/parse contract JSON {args.contract_json}: {e}", file=sys.stderr)
        sys.exit(2)

    # ----------------- Peek at header -----------------
    try:
        first_line = _read_s3_text_first_line(args.s3_input, session=session)
    except Exception as e:
        print(f"ERROR: Failed to read first line from {args.s3_input}: {e}", file=sys.stderr)
        sys.exit(2)

    has_header, toks = _detect_header(first_line, contract_cols)

    # Start building the result report
    result = {
        "input": args.s3_input,
        "contract": args.contract_json,
        "allow_extras": allow_extras,
        "detected_header": has_header,
        "first_line_tokens": toks[:50],  # cap to avoid logging huge lines
    }

    if not has_header:
        # No header detected = we can’t validate column names safely
        result.update({
            "status": "error",
            "error": "No header detected in first line of CSV. Provide a header row that matches columns.json.",
        })
    else:
        # Perform detailed schema analysis
        analysis = _analyze(header_cols=toks, contract_cols=contract_cols, allow_extras=allow_extras)
        result.update(analysis)

    # ----------------- Write report -----------------
    # Processing containers expect outputs under /opt/ml/processing/output
    out_dir = os.environ.get("SM_OUTPUT_DATA_DIR", "/opt/ml/processing/output")
    os.makedirs(out_dir, exist_ok=True)
    out_path = os.path.join(out_dir, "schema_check.json")
    with open(out_path, "w", encoding="utf-8") as f:
        json.dump(result, f, indent=2, ensure_ascii=False)

    # ----------------- Console logs -----------------
    print(f"[schema-guard] Input:     {args.s3_input}")
    print(f"[schema-guard] Contract:  {args.contract_json}")
    print(f"[schema-guard] Header?:   {result['detected_header']}")
    if result["detected_header"]:
        print(f"[schema-guard] Missing:   {result.get('missing_required', [])}")
        print(f"[schema-guard] Extras:    {result.get('extra_columns', [])} (allow_extras={allow_extras})")
        print(f"[schema-guard] Order OK:  {result.get('order_matches')}")
    print(f"[schema-guard] Status:    {result['status']}")
    print(f"[schema-guard] Report ->  {out_path}")

    # ----------------- Exit code semantics -----------------
    if result["status"] != "ok":
        # Non-zero exit signals the pipeline to fail immediately
        sys.exit(1)


if __name__ == "__main__":
    main()
Why a smoke test? It’s a cheap, fast check to catch obvious problems (e.g., missing columns) before we run expensive steps. It improves reliability and cost control.

Step 2 — The pipeline submitter (define + upsert + run)

Create scripts/submit_pipeline_batch.py:

# scripts/submit_pipeline_batch.py
"""
Lab 10 — Submit a SageMaker Pipeline for Batch Scoring (with a schema "smoke test")

What this script does (end-to-end):
  1) Creates / updates ("upserts") a SageMaker Pipeline composed of three steps:
       a) ProcessingStep  : runs a tiny schema guard (fast fail if columns are wrong)
       b) ModelStep       : creates a SageMaker Model from the latest *Approved* package
       c) TransformStep   : launches a Batch Transform job to score the input CSV
  2) Starts the pipeline and waits for completion (quiet waiter; no log streaming)
  3) Prints a deterministic S3 output prefix containing the predictions

Environment required (source ~/mlops-env.sh first):
  - AWS_REGION     : e.g., ap-northeast-2
  - LAB_PREFIX     : e.g., student  (used to name resources)
  - BUCKET         : your project bucket (NOT the SageMaker default bucket)
  - S3_ARTIFACTS   : s3://<BUCKET>/artifacts   (single root for all lab artifacts)
  - SM_ROLE_ARN    : execution role for SageMaker

Design choices to note:
  - We resolve the *latest Approved* model package **at submit time** (boto3 call).
  - We pass the feature contract (columns.json) to the container via ENV.
  - We give the pipeline an ExecId stamp so the output S3 path is known in advance.
  - We configure the SageMaker **PipelineSession** with a custom default bucket
    (your project BUCKET), so *code tarballs* for steps land in your bucket
    instead of the "sagemaker-<region>-<acct>" default.
"""

import os
import time
import boto3
from datetime import datetime

# --- SageMaker Workflow primitives ---
from sagemaker.workflow.parameters import ParameterString, ParameterBoolean
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.functions import Join, JsonGet
from sagemaker.workflow.steps import ProcessingStep, TransformStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionEquals
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.properties import PropertyFile

# --- Process / Model / Transform helpers ---
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingOutput
from sagemaker.sklearn.model import SKLearnModel
from sagemaker.transformer import Transformer

# --- Force *our* bucket as the session's default (no SageMaker default bucket) ---
from sagemaker.session import SessionSettings

# =============================================================================
# Environment + clients
# =============================================================================
REGION       = os.environ.get("AWS_REGION", "ap-northeast-2")
BUCKET       = os.environ["BUCKET"]                            # project bucket (your own)
LABP         = os.environ.get("LAB_PREFIX", "student")
SM_ROLE_ARN  = os.environ["SM_ROLE_ARN"]
S3_ARTIFACTS = os.environ["S3_ARTIFACTS"].rstrip("/")          # e.g., s3://<bucket>/artifacts

boto_sess = boto3.Session(region_name=REGION)
sm = boto_sess.client("sagemaker")
s3 = boto_sess.client("s3")

# IMPORTANT: this is what prevents uploads to the SageMaker default bucket.
# Any step (Processing/Training/Transform/Model upload) that relies on the
# session default bucket will now use YOUR project bucket.
pipe_sess = PipelineSession(
    boto_session=boto_sess,
    default_bucket=BUCKET
)

# =============================================================================
# Helpers: resolve the latest Approved model package + pick out fields we need
# =============================================================================
def latest_approved_pkg(group: str) -> str:
    """Return the ARN of the newest Approved model package in the given package group."""
    r = sm.list_model_packages(
        ModelPackageGroupName=group,
        ModelApprovalStatus="Approved",
        SortBy="CreationTime",
        SortOrder="Descending",
        MaxResults=1,
    )
    lst = r.get("ModelPackageSummaryList", [])
    if not lst:
        raise SystemExit("No Approved model package. Approve one in Lab 7.")
    return lst[0]["ModelPackageArn"]


def describe_pkg_bits(pkg_arn: str):
    """
    Extract:
      - image_uri      : inference container image
      - model_data_url : model.tar.gz in S3
      - eval_json_s3   : (optional) metrics JSON from Model Registry
    """
    info = sm.describe_model_package(ModelPackageName=pkg_arn)
    c = info["InferenceSpecification"]["Containers"][0]
    image_uri = c["Image"]
    model_data_url = c["ModelDataUrl"]
    # Optional; parity with Lab 8. Your inference.py tolerates it missing.
    eval_json_s3 = (
        info.get("ModelMetrics", {})
            .get("ModelQuality", {})
            .get("Statistics", {})
            .get("S3Uri")
    )
    return image_uri, model_data_url, eval_json_s3


# =============================================================================
# Build the Pipeline definition (no jobs started yet)
# =============================================================================
def build_pipeline() -> Pipeline:
    # Resolve Model Registry bits now (author-time / submit-time)
    group = f"{LABP}-telco-churn"
    pkg_arn = latest_approved_pkg(group)
    image_uri, model_data_url, eval_json_s3 = describe_pkg_bits(pkg_arn)

    # ---------------------------
    # User-tunable pipeline inputs
    # ---------------------------
    p_input_s3 = ParameterString(
        name="InputS3",
        default_value=f"s3://{BUCKET}/data/processed/test/test.csv",  # small file for demo speed
    )
    p_feature_json = ParameterString(
        name="FeatureListJson",
        default_value=f"{S3_ARTIFACTS}/preprocess/columns.json",      # contract produced earlier
    )
    p_allow_extras = ParameterBoolean(
        name="AllowExtraColumns",
        default_value=True,                                           # extras are tolerated by guard
    )
    p_instance_type = ParameterString(
        name="TransformInstanceType",
        default_value="ml.m5.large",
    )
    p_exec_id = ParameterString(
        name="ExecId",
        default_value=datetime.utcnow().strftime("%Y%m%d-%H%M%S"),    # stamp so output path is known now
    )
    p_output_prefix = ParameterString(
        name="OutputPrefix",
        default_value=f"{S3_ARTIFACTS}/batch/",                        # we Join this with ExecId below
    )

    # =========================================================================
    # Step 1 — ProcessingStep: run a tiny "schema guard" (fast fail)
    # =========================================================================
    # We use the prebuilt scikit-learn Processing image only as a Python runner.
    guard = SKLearnProcessor(
        framework_version="1.2-1",
        role=SM_ROLE_ARN,
        instance_type="ml.m5.large",
        instance_count=1,
        sagemaker_session=pipe_sess,  # uses YOUR bucket for code uploads
    )

    # Where the guard writes its JSON verdict *inside* the container.
    # schema_guard.py must emit: /opt/ml/processing/output/schema_check.json
    schema_report = PropertyFile(
        name="SchemaReport",
        output_name="output",
        path="schema_check.json",
    )

    # Place the output JSON into your artifacts area:
    guard_output_prefix = Join(on="", values=[S3_ARTIFACTS, "/schema-guard/", p_exec_id, "/"])

    guard_step = ProcessingStep(
        name="SchemaGuard",
        processor=guard,
        code="sagemaker/code/schema_guard.py",  # this script is uploaded to YOUR BUCKET via pipe_sess
        job_arguments=[
            "--s3-input", p_input_s3,
            "--contract-json", p_feature_json,
            # Pipeline booleans don’t serialize directly into argv; we keep the existing behavior:
            "--allow-extras", Join(on="", values=["", "true" if p_allow_extras.default_value else "true"]),
        ],
        outputs=[
            ProcessingOutput(
                output_name="output",
                source="/opt/ml/processing/output",
                destination=guard_output_prefix,  # lives under S3_ARTIFACTS
            )
        ],
        property_files=[schema_report],
    )

    # =========================================================================
    # Step 2 — ModelStep: create a Model the Transform step can reference
    # =========================================================================
    # We DO NOT call sk_model.transformer() here (that would eagerly create a Model).
    # Using ModelStep defers creation to pipeline runtime and exposes ModelName as a property.
    sk_model = SKLearnModel(
        model_data=model_data_url,        # from Approved package
        image_uri=image_uri,
        role=SM_ROLE_ARN,
        entry_point="inference.py",       # your lab's inference code
        source_dir="sagemaker/code",
        sagemaker_session=pipe_sess,      # code tarball → YOUR BUCKET (code_location below)
        code_location=f"{S3_ARTIFACTS}/code",
        env={
            "FEATURE_LIST_JSON": p_feature_json,
            "EVAL_JSON_S3": eval_json_s3 or "",  # optional; inference.py tolerates empty
        },
    )

    # Capture the model-creation arguments (no API call yet; PipelineSession intercepts)
    model_step_args = sk_model.create()   # Model name will be auto-generated at execution time

    model_step = ModelStep(
        name="CreateModel",
        step_args=model_step_args,
    )

    # =========================================================================
    # Step 3 — TransformStep: run batch scoring (references ModelStep.ModelName)
    # =========================================================================
    out_prefix = Join(on="", values=[p_output_prefix, p_exec_id, "/"])  # s3://.../batch/<exec_id>/

    transformer = Transformer(
        model_name=model_step.properties.ModelName,   # pipeline variable bound at runtime
        instance_type=p_instance_type,
        instance_count=1,
        strategy="MultiRecord",
        assemble_with="Line",
        output_path=out_prefix,
        max_payload=6,                    # MB per request payload chunk
        max_concurrent_transforms=2,      # modest parallelism for the lab
        sagemaker_session=pipe_sess,      # ensures any helper uploads use YOUR BUCKET
    )

    # Build transform step args via the high-level API (captured, not executed)
    transform_step_args = transformer.transform(
        data=p_input_s3,
        content_type="text/csv",
        split_type="Line",
    )

    transform_step = TransformStep(
        name="BatchTransform",
        step_args=transform_step_args,
    )

    # =========================================================================
    # Simple condition: proceed only if the guard produced its output location
    # (This is a lightweight "existence" proxy; a strict boolean parse would use a LambdaStep.)
    # =========================================================================
    # ---- Condition: proceed only if the guard said "status": "ok" ----
    # We read the JSON emitted by schema_guard.py using JsonGet on the PropertyFile.
    # Your working schema_guard.py writes "status": "ok" | "error" → match on "ok".
    cond = ConditionStep(
        name="IfSchemaOK",
        conditions=[
            ConditionEquals(
                left=JsonGet(
                    step_name=guard_step.name,
                    property_file=schema_report,
                    json_path="status",        # <— matches your current schema_guard.py
                ),
                right="ok",
            ),
        ],
        if_steps=[model_step, transform_step],  # only run model + transform when schema is OK
        else_steps=[
            FailStep(
                name="FailMissingColumns",
                error_message="Schema guard failed: missing/invalid columns",
            )
        ],
    )

    # Final pipeline object (still nothing has run yet)
    pipeline = Pipeline(
        name=f"{LABP}-batch-transform-pipeline",
        parameters=[p_input_s3, p_feature_json, p_allow_extras, p_instance_type, p_exec_id, p_output_prefix],
        steps=[guard_step, cond],
        sagemaker_session=pipe_sess,
    )
    return pipeline


# =============================================================================
# Upsert the pipeline and kick off one execution
# =============================================================================
def upsert_and_run():
    pipeline = build_pipeline()
    pipeline.upsert(role_arn=SM_ROLE_ARN)

    # Fresh ExecId so your output path is deterministic and known at submit time
    exec_id = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
    execution = pipeline.start(parameters={"ExecId": exec_id})

    print("Pipeline started:", execution.arn)
    print("ExecId:", exec_id)

    # Quiet waiter — we don’t stream logs here (students can explore in Console)
    while True:
        d = execution.describe()
        status = d["PipelineExecutionStatus"]
        if status in ("Succeeded", "Failed", "Stopped"):
            print("Pipeline status:", status)
            break
        time.sleep(15)

    # Deterministic output path (we stamped ExecId)
    print("Output prefix:", f"{S3_ARTIFACTS}/batch/{exec_id}/")


if __name__ == "__main__":
    upsert_and_run()

A few notes on the above:

  • We resolve the latest Approved package at submit time (so the pipeline uses the right container/model_data and we can pass EVAL_JSON_S3 if you want the threshold prints).
  • We keep code tarballs in your project bucket via code_location=f"{S3_ARTIFACTS}/code".
  • We stamp outputs with an ExecId parameter so you can know (and script) the final S3 location before execution.
  • The schema guard prints a JSON report and will fail the pipeline if your processing script raises (missing required columns is where you’d opt to raise SystemExit in the guard if you prefer hard fail at the script level).
If you prefer a hard Boolean gate: change schema_guard.py to raise SystemExit(2) when missing_required. The ProcessingStep will then fail, and the pipeline stops without needing a ConditionStep.

Step 3 — Run from the terminal

# 1) load env
source ~/mlops-env.sh

# 2) submit + run (defines pipeline if missing, then starts an execution)
python scripts/submit_pipeline_batch.py

Typical output (minimal):

Pipeline started: arn:aws:sagemaker:ap-northeast-2:123456789012:pipeline/<name>/execution/<id>
ExecId: 20250827-010423
Pipeline status: Succeeded
Output prefix: s3://<bucket>/artifacts/batch/<timestamp>/

Tip: Be patient. It can take a while


Step 4 — Verify results

  • Check Inference Output
# list the output object(s)
OUT='<copy output prefix from output of python job>'
aws s3 ls "$OUT"

# stream first 10 lines to verify header and a few rows
aws s3 cp "${OUT}test.csv.out" - | head -n 10
  • Check uploaded code from pipeline
    • You will likely see that 2 sagemaker-scikit-learn directories have been created

    This is expected behavior when using SageMaker's SKLearnProcessor and PipelineSession.

    Each pipeline execution uploads your code (e.g., schema_guard.py) to a unique S3 location, often with a timestamp or execution ID in the folder name.

    Why does this happen?

    • Each pipeline run creates a new code tarball and uploads it to your specified bucket.
    • The folder names (like sagemaker-scikit-learn-2025-08-27-08-59-57-407/ and sagemaker-scikit-learn-2025-08-27-08-59-57-784/) are unique to each run, ensuring isolation and reproducibility.

    Is this a problem?

    • No, it's normal and safe. SageMaker manages these uploads automatically.
    • You can clean up old folders if you want to save space, but they do not affect future runs.

    Summary:

    SageMaker uploads your code for each run to a unique S3 folder. This is normal and ensures each pipeline execution is reproducible and isolated.


Step 5 — Version Control

Don’t forget to add the latest code to your Git.

Challenge: ruff will complain about a few items that it can not auto-correct. We have learnt various ways to overcome this, including manual fixes and by-passing. Another, very fun way to fix it is to have Github Copilot help you with it. Give it a try.

Why this design works

  • Resilient to drift: the schema guard (smoke test) cheaply catches missing columns; your inference script (Pattern A) safely ignores extra columns and reorders to the contract.
  • Self-contained: all code bundles (source_dir.tar.gz) and outputs live under your project bucket, not the default SM bucket.
  • Deterministic outputs: a known ExecId → easy downstream automation and testing.
  • Quiet logs: the submitter prints only pipeline status + final output prefix.

Mini-glossary

  • Smoke test: a very quick, low-cost check to catch obvious breakages (e.g., missing columns) before running heavier jobs. Think “does it even turn on?”
  • Schema/contract: the agreed set and order of features that both producers and consumers must honor (here: columns.json from preprocessing).
  • Data-source drift: columns or distributions change over time. We treat extra columns as benign (drop), but missing required columns as a stop-the-line issue.
  • ProcessingStep: a managed container run for data prep/validation scripts.
  • TransformStep: a managed, ephemeral batch inference job that reads from S3, writes to S3.
  • Pipeline: a DAG of steps you version and run; great for repeatable, auditable batch workflows.

How the DAG is built in your pipeline

  • Unlike AirFlow, the DAG is formed by references, not by list order.

    In SageMaker Pipelines, a node (step) becomes part of the graph when:

    1. it’s listed in Pipeline(steps=[...]), or
    1. it’s referenced by another step that is listed (e.g., inside a ConditionStep’s if_steps/else_steps, a StepCollection, or via a property reference like other_step.properties.*).
  • Why only guard_step and cond appear in Pipeline(... steps=[...]):

    model_step and transform_step are children of the ConditionStep (cond). Because you passed them to ConditionStep(if_steps=[model_step, transform_step]), they are included in the compiled DAG even though they are not top-level items in the steps=[...] list. There is no default model or transform in a pipeline — they’re present because cond references them.

  • Execution order is determined by edges, not the array index.

    The steps=[guard_step, cond] list is just a container of entry points. The actual order comes from the dependencies you’ve expressed:

    • cond depends on guard_step (because it uses guard_step.properties... and you’ve placed guard_step as a separate step the condition relies on).
    • model_step and transform_step depend on cond (because they are inside cond.if_steps).

    SageMaker will topologically sort the graph and run steps as soon as their prerequisites are satisfied. If two steps don’t depend on each other, they’ll run in parallel.

Ways to express dependencies

You’ve already used two of the main patterns:

  1. Property/argument dependency (data or metadata flow)

    Example: using guard_step.properties.ProcessingOutputConfig... inside ConditionStep. That reference creates an edge guard_step → cond.

  1. Step collections (branching)

    ConditionStep(if_steps=[...], else_steps=[...]) nests steps and creates edges from the ConditionStep to the nested steps.

There’s also an explicit:

  • Manual ordering with depends_on (when there is no data/property link but you still want an order):
post_step = ProcessingStep(
    name="SummarizeResults",
    processor=guard,  # could be any processor / step type
    code="sagemaker/code/summarize.py",
    depends_on=[transform_step],   # <- purely ordering, no property flow
)

This tells the compiler “don’t start SummarizeResults until transform_step is done,” even if summarize.py doesn’t read anything from transform_step’s properties.

↑ Back to top